Source code for vdrp.mphelpers

import threading
import multiprocessing.pool
import Queue
import time
import logging
import os
import sys
import copy

_logger = logging.getLogger()


# Parallelization code, we supply both a ThreadPool as well as a
# multiprocessing pool. Both start a given numer of threads/processes,
# that will work through the supplied tasks, till all are finished.
#
# The ThreadPool does not need to start subprocesses, but is limited by
# the Python Global Interpreter Lock (only one thread can access complex data
# types at one time). This can potentially slow things down.
#
# The MPPool needs to start up the processes, but this is only done once at
# the initializtion of the pool.
#
# The MPPool processes cannot start multiprocessing jobs themselves, so if
# you need nested parallelization, use the either ThreadPools for all, or
# Use one and the other.


[docs]class ThreadShutDownException(): pass
[docs]def shutdownThread(): raise ThreadShutDownException()
[docs]class ThreadWorker(threading.Thread): """Thread executing tasks from a given tasks queue""" def __init__(self, name, tasks): threading.Thread.__init__(self) self.name = name self.tasks = tasks self.daemon = True self.start()
[docs] def run(self): threading.current_thread().name = self.name _logger.debug('Starting Thread %s' % self.name) while True: try: func, args, kargs = self.tasks.get(True, 120.0) _logger.debug('Got new task from queue') _logger.debug('There are approx. %d tasks waiting' % self.tasks.qsize()) try: func(*args, **kargs) except ThreadShutDownException: _logger.info('Shutting down thread') break except Exception as e: _logger.exception(e) finally: self.tasks.task_done() except Queue.Empty: print('%s %s queue is empty, shutting down!' % (time.strftime('%H:%M:%S'), self.name)) return except Exception as e: _logger.exception(e)
[docs]class MPWorker(multiprocessing.Process): """Thread executing tasks from a given tasks queue""" def __init__(self, name, tasks): multiprocessing.Process.__init__(self) self.name = name self.tasks = tasks self.start()
[docs] def run(self): while True: try: func, args, kargs = self.tasks.get(True, 1200.0) _logger.debug('Got new task from queue') _logger.debug('There are approx. %d tasks waiting' % self.tasks.qsize()) try: func(*args, **kargs) except ThreadShutDownException: _logger.info('Shutting down thread') break except Exception as e: _logger.exception(e) finally: self.tasks.task_done() except Queue.Empty: print('%s %s queue is empty, shutting down!' % (time.strftime('%H:%M:%S'), self.name)) return except Exception as e: _logger.exception(e)
[docs]class ThreadPool: """Pool of threads consuming tasks from a queue""" def __init__(self, num_threads): self.num_threads = num_threads self.tasks = Queue.Queue() for i in range(num_threads): ThreadWorker('ThreadWorker%d' % i, self.tasks)
[docs] def add_task(self, func, *args, **kargs): """Add a task to the queue""" _logger.info('Adding new task to queue') self.tasks.put((func, args, kargs))
[docs] def wait_completion(self): """Wait for completion of all the tasks in the queue""" _logger.info('Job submission complete, adding shutdown jobs') for i in range(self.num_threads): self.add_task(shutdownThread) self.tasks.join()
[docs]class MPPool: """Pool of threads consuming tasks from a queue""" def __init__(self, jobnum, num_proc): self.num_proc = num_proc self.tasks = multiprocessing.JoinableQueue() for i in range(num_proc): print('Creating mp workers') MPWorker('MPWorker%d_%d' % (jobnum, i), self.tasks)
[docs] def add_task(self, func, *args, **kargs): """Add a task to the queue""" _logger.info('Adding new task %s to queue' % func) self.tasks.put((func, args, kargs))
[docs] def wait_completion(self): """Wait for completion of all the tasks in the queue""" _logger.info('Job submission complete, adding shutdown jobs') for i in range(self.num_proc): self.add_task(shutdownThread) self.tasks.join()
[docs]def mp_run(func, args, rargv, parser): # We found a -M flag with a command file, now loop over it, we parse # the command line parameters for each call, and intialize the args # namespace for this call. if args.multi: mfile = args.multi.split('[')[0] if not os.path.isfile(mfile): raise Exception('%s is not a file?' % mfile) try: # Try to read the file with open(mfile) as f: cmdlines = f.readlines() except Exception as e: _logger.exception(e) raise Exception('Failed to read input file %s!' % args.multi) # Parse the line numbers to evaluate, if any given. if args.multi.find('[') != -1: try: minl, maxl = args.multi.split('[')[1].split(']')[0].split(':') except ValueError: raise Exception('Failed to parse line range, should be of ' 'form [min:max]!') cmdlines = cmdlines[int(minl):int(maxl)] # Create the ThreadPool. pool = ThreadPool(args.mcores) c = 1 # For each command line add an entry to the ThreadPool. for l in cmdlines: largs = copy.copy(rargv) largs += l.split() main_args = parser(largs) pool.add_task(func, c, copy.copy(main_args)) # Wait for all tasks to complete pool.wait_completion() sys.exit(0) else: # Parse config file and command line paramters # command line parameters overwrite config file. # The first positional argument wasn't an input list, # so process normally args = parser(rargv) sys.exit(func(1, args))