Source code for pyexperiment.utils.DelegateCall

"""Provides a multiprocessing-safe way to aggregate results from
multiple function calls.

In multi-process programs, it is oft often useful to delegate a
function call - e.g., writing a log message to a file - to another
process to avoid conflicts. :class:`pyexperiment.utils.DelegateCall`
implements a functor that, when called, passes the argument data to a
function running in a thread of the process that created the
DelegateCall object. The callback itself must be thread-safe though.

Written by Peter Duerr

"""
from __future__ import print_function
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import

import sys
import multiprocessing
import threading
import traceback


[docs]class DelegateCall(object): # pylint: disable=too-few-public-methods """Helper class that provides a multiprocessing-safe way to aggregate results from multiple function calls. The arguments to the __call__ function are passed through a multiprocessing.Queue to the process where the class was initialized (i.e., all arguments must be serializable). """ def __init__(self, callback): """Initializer, takes a callback that processes the received data in the original process. """ # The callback self.callback = callback # The queue that aggregates the data self._queue = multiprocessing.JoinableQueue(-1) # The thread that processes it processor_thread = threading.Thread(target=self._receive) processor_thread.daemon = True processor_thread.start()
[docs] def __call__(self, data): """Send data, can be called from any process """ self._queue.put_nowait(data)
[docs] def join(self): """Returns true if there are currently no pending callbacks """ return self._queue.join()
def _receive(self): """Loops indefinitely receiving calls from the queue """ while True: try: data = self._queue.get() self.callback(data) self._queue.task_done() except (KeyboardInterrupt, SystemExit): raise except EOFError: break # This should really catch every other exception! except Exception: # pylint: disable=broad-except traceback.print_exc(file=sys.stderr) finally: pass