How to retrieve values from a function run in parallel processes?










6















The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python.



from time import time

xmax = 100000000

start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)


This outputs as expected:



Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 8.667453265190124e-07
Total time: 86.67453265190125


As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question.



from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
L = rng[1] - rng[0]
Lr = L % t
Lm = L // t
h = rng[0]-1
chunks =
for i in range(0, t):
c = [h+1, h + Lm]
h += Lm
chunks.append(c)
chunks[t-1][1] += Lr + 1
return chunks

def worker(lock, xrange, return_dict):
'''worker function'''
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
return_dict['x'].append(x)
return_dict['y'].append(y)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y

if __name__ == '__main__':
start = time()
manager = mp.Manager()
return_dict = manager.dict()
lock = manager.Lock()
return_dict['x']=manager.list()
return_dict['y']=manager.list()
xmax = 100000000
nw = mp.cpu_count()
workers = list(range(0, nw))
chunks = chunker([0, xmax], nw)
jobs =
for i in workers:
p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)
print(return_dict['x'])
print(return_dict['y'])


which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.



the output I get is:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498




from which I expect:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498
[0, 1, 2]
[-15, -3, 11]









share|improve this question
























  • To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

    – Skapin
    Nov 13 '18 at 19:58












  • Or check that, does it help ? stackoverflow.com/questions/35157367/…

    – Skapin
    Nov 13 '18 at 20:00















6















The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python.



from time import time

xmax = 100000000

start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)


This outputs as expected:



Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 8.667453265190124e-07
Total time: 86.67453265190125


As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question.



from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
L = rng[1] - rng[0]
Lr = L % t
Lm = L // t
h = rng[0]-1
chunks =
for i in range(0, t):
c = [h+1, h + Lm]
h += Lm
chunks.append(c)
chunks[t-1][1] += Lr + 1
return chunks

def worker(lock, xrange, return_dict):
'''worker function'''
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
return_dict['x'].append(x)
return_dict['y'].append(y)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y

if __name__ == '__main__':
start = time()
manager = mp.Manager()
return_dict = manager.dict()
lock = manager.Lock()
return_dict['x']=manager.list()
return_dict['y']=manager.list()
xmax = 100000000
nw = mp.cpu_count()
workers = list(range(0, nw))
chunks = chunker([0, xmax], nw)
jobs =
for i in workers:
p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)
print(return_dict['x'])
print(return_dict['y'])


which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.



the output I get is:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498




from which I expect:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498
[0, 1, 2]
[-15, -3, 11]









share|improve this question
























  • To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

    – Skapin
    Nov 13 '18 at 19:58












  • Or check that, does it help ? stackoverflow.com/questions/35157367/…

    – Skapin
    Nov 13 '18 at 20:00













6












6








6


1






The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python.



from time import time

xmax = 100000000

start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)


This outputs as expected:



Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 8.667453265190124e-07
Total time: 86.67453265190125


As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question.



from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
L = rng[1] - rng[0]
Lr = L % t
Lm = L // t
h = rng[0]-1
chunks =
for i in range(0, t):
c = [h+1, h + Lm]
h += Lm
chunks.append(c)
chunks[t-1][1] += Lr + 1
return chunks

def worker(lock, xrange, return_dict):
'''worker function'''
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
return_dict['x'].append(x)
return_dict['y'].append(y)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y

if __name__ == '__main__':
start = time()
manager = mp.Manager()
return_dict = manager.dict()
lock = manager.Lock()
return_dict['x']=manager.list()
return_dict['y']=manager.list()
xmax = 100000000
nw = mp.cpu_count()
workers = list(range(0, nw))
chunks = chunker([0, xmax], nw)
jobs =
for i in workers:
p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)
print(return_dict['x'])
print(return_dict['y'])


which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.



the output I get is:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498




from which I expect:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498
[0, 1, 2]
[-15, -3, 11]









share|improve this question
















The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python.



from time import time

xmax = 100000000

start = time()
for x in range(xmax):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)


This outputs as expected:



Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2
Each iteration took: 8.667453265190124e-07
Total time: 86.67453265190125


As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question.



from time import time 
import multiprocessing as mp

def chunker (rng, t): # this functions makes t chunks out of rng
L = rng[1] - rng[0]
Lr = L % t
Lm = L // t
h = rng[0]-1
chunks =
for i in range(0, t):
c = [h+1, h + Lm]
h += Lm
chunks.append(c)
chunks[t-1][1] += Lr + 1
return chunks

def worker(lock, xrange, return_dict):
'''worker function'''
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
return_dict['x'].append(x)
return_dict['y'].append(y)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y

if __name__ == '__main__':
start = time()
manager = mp.Manager()
return_dict = manager.dict()
lock = manager.Lock()
return_dict['x']=manager.list()
return_dict['y']=manager.list()
xmax = 100000000
nw = mp.cpu_count()
workers = list(range(0, nw))
chunks = chunker([0, xmax], nw)
jobs =
for i in workers:
p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))
jobs.append(p)
p.start()

for proc in jobs:
proc.join()
end = time()
tt = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time: ', tt)
print(return_dict['x'])
print(return_dict['y'])


which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.



the output I get is:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498




from which I expect:



Each iteration took: 1.7742713451385497e-07
Total time: 17.742713451385498
[0, 1, 2]
[-15, -3, 11]






python python-3.x parallel-processing multiprocessing python-multiprocessing






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Dec 14 '18 at 14:22









Darkonaut

3,2422821




3,2422821










asked Nov 13 '18 at 19:29









PouJaPouJa

748




748












  • To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

    – Skapin
    Nov 13 '18 at 19:58












  • Or check that, does it help ? stackoverflow.com/questions/35157367/…

    – Skapin
    Nov 13 '18 at 20:00

















  • To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

    – Skapin
    Nov 13 '18 at 19:58












  • Or check that, does it help ? stackoverflow.com/questions/35157367/…

    – Skapin
    Nov 13 '18 at 20:00
















To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

– Skapin
Nov 13 '18 at 19:58






To share data, you can use : - in memory database liek redis or memcache, or a pipe/socket, or get the return value of your process perhapse

– Skapin
Nov 13 '18 at 19:58














Or check that, does it help ? stackoverflow.com/questions/35157367/…

– Skapin
Nov 13 '18 at 20:00





Or check that, does it help ? stackoverflow.com/questions/35157367/…

– Skapin
Nov 13 '18 at 20:00












1 Answer
1






active

oldest

votes


















3














The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.



multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.



The quoting parts are from the multiprocessing docs.




Manager




If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...




In your case this would look like:



def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y


The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager using Lock)




This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.




Since Python 3.6 proxy objects are nestable:




Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.




Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.



return_dict['x'] = manager.list()
return_dict['y'] = manager.list()


EDIT:



Here is the full example with Manager:



import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]

with context_timer():

pool = [Process(target=worker, args=args)
for args in tasks]

for p in pool:
p.start()
for p in pool:
p.join()

# Create standard container with data from manager before exiting
# the manager.
result = k: list(v) for k, v in return_dict.items()

print(result)



Pool



Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker function doesn't manage to divide the range even so every process has about the same work to do:



chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!


For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.



With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:



import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'nEach iteration took: total_time / X_MAX:.4f s')
print(f'Total time: total_time:.4f sn')


def worker(batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')


Example Output:



[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2

Each iteration took: 0.0000 s
Total time: 8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], , , , , , , ]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0


If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.




Process & Queue



If you can't use multiprocessing.Pool for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.



You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.



A solution with this approach could look like this:



def worker(result_queue, batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():

pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]

for p in pool:
p.start()

results = [result_queue.get() for _ in batch_ranges]

for p in pool:
p.join()

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')





share|improve this answer

























  • I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

    – PouJa
    Nov 14 '18 at 6:54











  • @Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

    – Darkonaut
    Nov 14 '18 at 11:11











  • @Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

    – Darkonaut
    Nov 14 '18 at 11:12











  • Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

    – PouJa
    Nov 14 '18 at 12:21







  • 1





    Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

    – PouJa
    Nov 14 '18 at 14:45










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













draft saved

draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53288231%2fhow-to-retrieve-values-from-a-function-run-in-parallel-processes%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes









3














The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.



multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.



The quoting parts are from the multiprocessing docs.




Manager




If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...




In your case this would look like:



def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y


The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager using Lock)




This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.




Since Python 3.6 proxy objects are nestable:




Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.




Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.



return_dict['x'] = manager.list()
return_dict['y'] = manager.list()


EDIT:



Here is the full example with Manager:



import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]

with context_timer():

pool = [Process(target=worker, args=args)
for args in tasks]

for p in pool:
p.start()
for p in pool:
p.join()

# Create standard container with data from manager before exiting
# the manager.
result = k: list(v) for k, v in return_dict.items()

print(result)



Pool



Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker function doesn't manage to divide the range even so every process has about the same work to do:



chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!


For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.



With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:



import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'nEach iteration took: total_time / X_MAX:.4f s')
print(f'Total time: total_time:.4f sn')


def worker(batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')


Example Output:



[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2

Each iteration took: 0.0000 s
Total time: 8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], , , , , , , ]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0


If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.




Process & Queue



If you can't use multiprocessing.Pool for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.



You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.



A solution with this approach could look like this:



def worker(result_queue, batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():

pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]

for p in pool:
p.start()

results = [result_queue.get() for _ in batch_ranges]

for p in pool:
p.join()

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')





share|improve this answer

























  • I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

    – PouJa
    Nov 14 '18 at 6:54











  • @Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

    – Darkonaut
    Nov 14 '18 at 11:11











  • @Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

    – Darkonaut
    Nov 14 '18 at 11:12











  • Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

    – PouJa
    Nov 14 '18 at 12:21







  • 1





    Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

    – PouJa
    Nov 14 '18 at 14:45















3














The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.



multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.



The quoting parts are from the multiprocessing docs.




Manager




If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...




In your case this would look like:



def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y


The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager using Lock)




This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.




Since Python 3.6 proxy objects are nestable:




Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.




Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.



return_dict['x'] = manager.list()
return_dict['y'] = manager.list()


EDIT:



Here is the full example with Manager:



import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]

with context_timer():

pool = [Process(target=worker, args=args)
for args in tasks]

for p in pool:
p.start()
for p in pool:
p.join()

# Create standard container with data from manager before exiting
# the manager.
result = k: list(v) for k, v in return_dict.items()

print(result)



Pool



Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker function doesn't manage to divide the range even so every process has about the same work to do:



chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!


For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.



With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:



import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'nEach iteration took: total_time / X_MAX:.4f s')
print(f'Total time: total_time:.4f sn')


def worker(batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')


Example Output:



[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2

Each iteration took: 0.0000 s
Total time: 8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], , , , , , , ]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0


If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.




Process & Queue



If you can't use multiprocessing.Pool for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.



You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.



A solution with this approach could look like this:



def worker(result_queue, batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():

pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]

for p in pool:
p.start()

results = [result_queue.get() for _ in batch_ranges]

for p in pool:
p.join()

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')





share|improve this answer

























  • I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

    – PouJa
    Nov 14 '18 at 6:54











  • @Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

    – Darkonaut
    Nov 14 '18 at 11:11











  • @Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

    – Darkonaut
    Nov 14 '18 at 11:12











  • Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

    – PouJa
    Nov 14 '18 at 12:21







  • 1





    Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

    – PouJa
    Nov 14 '18 at 14:45













3












3








3







The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.



multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.



The quoting parts are from the multiprocessing docs.




Manager




If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...




In your case this would look like:



def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y


The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager using Lock)




This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.




Since Python 3.6 proxy objects are nestable:




Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.




Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.



return_dict['x'] = manager.list()
return_dict['y'] = manager.list()


EDIT:



Here is the full example with Manager:



import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]

with context_timer():

pool = [Process(target=worker, args=args)
for args in tasks]

for p in pool:
p.start()
for p in pool:
p.join()

# Create standard container with data from manager before exiting
# the manager.
result = k: list(v) for k, v in return_dict.items()

print(result)



Pool



Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker function doesn't manage to divide the range even so every process has about the same work to do:



chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!


For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.



With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:



import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'nEach iteration took: total_time / X_MAX:.4f s')
print(f'Total time: total_time:.4f sn')


def worker(batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')


Example Output:



[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2

Each iteration took: 0.0000 s
Total time: 8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], , , , , , , ]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0


If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.




Process & Queue



If you can't use multiprocessing.Pool for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.



You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.



A solution with this approach could look like this:



def worker(result_queue, batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():

pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]

for p in pool:
p.start()

results = [result_queue.get() for _ in batch_ranges]

for p in pool:
p.join()

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')





share|improve this answer















The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.



multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.



The quoting parts are from the multiprocessing docs.




Manager




If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...




In your case this would look like:



def worker(xrange, return_dict, lock):
"""worker function"""
for x in range(xrange[0], xrange[1]):
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
list_x = return_dict['x']
list_y = return_dict['y']
list_x.append(x)
list_y.append(y)
return_dict['x'] = list_x
return_dict['y'] = list_y


The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here
is an easier example with Manager using Lock)




This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.




Since Python 3.6 proxy objects are nestable:




Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.




Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.



return_dict['x'] = manager.list()
return_dict['y'] = manager.list()


EDIT:



Here is the full example with Manager:



import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges

# def context_timer ... see code snippet in "Pool" section below

def worker(batch_range, return_dict, lock):
"""worker function"""
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
with lock:
return_dict['x'].append(x)
return_dict['y'].append(y)


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with Manager() as manager:
lock = manager.Lock()
return_dict = manager.dict()
return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

tasks = [(batch_range, return_dict, lock)
for batch_range in batch_ranges]

with context_timer():

pool = [Process(target=worker, args=args)
for args in tasks]

for p in pool:
p.start()
for p in pool:
p.join()

# Create standard container with data from manager before exiting
# the manager.
result = k: list(v) for k, v in return_dict.items()

print(result)



Pool



Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range.
Your chunker function doesn't manage to divide the range even so every process has about the same work to do:



chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]] # 4, 4, 4, 6!


For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.



With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:



import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chain

from mp_utils import calc_batch_sizes, build_batch_ranges

@contextmanager
def context_timer():
start_time = time.perf_counter()
yield
end_time = time.perf_counter()
total_time = end_time-start_time
print(f'nEach iteration took: total_time / X_MAX:.4f s')
print(f'Total time: total_time:.4f sn')


def worker(batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
return result


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():
with Pool(N_WORKERS) as pool:
results = pool.map(worker, iterable=batch_ranges)

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')


Example Output:



[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at: -15 0
Condition met at: -3 1
Condition met at: 11 2

Each iteration took: 0.0000 s
Total time: 8.2408 s

results: [[(0, -15), (1, -3), (2, 11)], , , , , , , ]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)

Process finished with exit code 0


If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.




Process & Queue



If you can't use multiprocessing.Pool for some reason, you have to take
care of inter-process communication (IPC) yourself, by passing a
multiprocessing.Queue as argument to your worker-functions in the child-
processes and letting them enqueue their results to be send back to the
parent.



You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.



A solution with this approach could look like this:



def worker(result_queue, batch_range):
"""worker function"""
result =
for x in batch_range:
y = ((x+5)**2+x-40)
if y <= 0xf+1:
print('Condition met at: ', y, x)
result.append((x, y))
result_queue.put(result) # <--


if __name__ == '__main__':

N_WORKERS = mp.cpu_count()
X_MAX = 100000000

result_queue = mp.Queue() # <--
batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)
batch_ranges = build_batch_ranges(batch_sizes)
print(batch_ranges)

with context_timer():

pool = [Process(target=worker, args=(result_queue, batch_range))
for batch_range in batch_ranges]

for p in pool:
p.start()

results = [result_queue.get() for _ in batch_ranges]

for p in pool:
p.join()

print(f'results: results')
x, y = zip(*chain.from_iterable(results)) # filter and sort results
print(f'results sorted: x: x, y: y')






share|improve this answer














share|improve this answer



share|improve this answer








edited Dec 14 '18 at 14:42

























answered Nov 14 '18 at 0:16









DarkonautDarkonaut

3,2422821




3,2422821












  • I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

    – PouJa
    Nov 14 '18 at 6:54











  • @Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

    – Darkonaut
    Nov 14 '18 at 11:11











  • @Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

    – Darkonaut
    Nov 14 '18 at 11:12











  • Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

    – PouJa
    Nov 14 '18 at 12:21







  • 1





    Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

    – PouJa
    Nov 14 '18 at 14:45

















  • I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

    – PouJa
    Nov 14 '18 at 6:54











  • @Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

    – Darkonaut
    Nov 14 '18 at 11:11











  • @Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

    – Darkonaut
    Nov 14 '18 at 11:12











  • Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

    – PouJa
    Nov 14 '18 at 12:21







  • 1





    Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

    – PouJa
    Nov 14 '18 at 14:45
















I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

– PouJa
Nov 14 '18 at 6:54





I am surprised with such a comprehensive answer. Thank you so much indeed. I still cannot make it with Manager() so I updated the question with few lines of code added as you recommend. Anyways, The Multiprocessing.Pool() approach seems interesting. But before taking one of these approaches, I would like to know which one is better for further development of heavier tasks? For example, I am aiming to run a loop project which normally takes 10 days to run as a single process. I want to run that loop over a cluster of 4 PCs connected through a LAN and benefit all CPU cores that they have.

– PouJa
Nov 14 '18 at 6:54













@Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

– Darkonaut
Nov 14 '18 at 11:11





@Pouya Jamali Please see my edit. Referring to your dropped answer: It's not that you get an exception when you don't use a lock, you just can end up with loss of data when multiple processes happen to read the list at the same time, change it and append it back. The last process out of this group will then override all changes the other processes have made because at the time he read the list, it wasn't updatet with the results from the other processes. In your example just one processes has results to append, so you will never observe it with this data.

– Darkonaut
Nov 14 '18 at 11:11













@Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

– Darkonaut
Nov 14 '18 at 11:12





@Pouya Jamali Within the StdLib you would have to use Manager if you need to share data over multiple machines. But you probably should look into dask for working with a cluster.

– Darkonaut
Nov 14 '18 at 11:12













Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

– PouJa
Nov 14 '18 at 12:21






Even with this latest edit I still cannot get the 3 pairs back. Only one pair is returned which is [0] [-15]. What does contextmanager do here?

– PouJa
Nov 14 '18 at 12:21





1




1





Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

– PouJa
Nov 14 '18 at 14:45





Now it is working. Sorry, I was persisting to use my own chunker function. I cannot understand why does it make sense!

– PouJa
Nov 14 '18 at 14:45



















draft saved

draft discarded
















































Thanks for contributing an answer to Stack Overflow!


  • Please be sure to answer the question. Provide details and share your research!

But avoid


  • Asking for help, clarification, or responding to other answers.

  • Making statements based on opinion; back them up with references or personal experience.

To learn more, see our tips on writing great answers.




draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53288231%2fhow-to-retrieve-values-from-a-function-run-in-parallel-processes%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

How to read a connectionString WITH PROVIDER in .NET Core?

Node.js Script on GitHub Pages or Amazon S3

Museum of Modern and Contemporary Art of Trento and Rovereto