Source code for propertyestimator.backends.backends

"""
Defines the base API for the property estimator task calculation backend.
"""
import re
from enum import Enum

from propertyestimator import unit


[docs]class ComputeResources: """An object which stores how many of each type of computational resource (threads or gpu's) is available to a calculation worker. TODO: The use of the terminology here is questionable, and is used interchangable with process which may lead to some confusion. """
[docs] class GPUToolkit(Enum): """An enumeration of the different GPU toolkits to make available to different calculations. """ CUDA = 'CUDA' OpenCL = 'OpenCL'
@property def number_of_threads(self): """int: The number of threads available to a calculation worker.""" return self._number_of_threads @property def number_of_gpus(self): """int: The number of GPUs available to a calculation worker.""" return self._number_of_gpus @property def preferred_gpu_toolkit(self): """ComputeResources.GPUToolkit: The preferred toolkit to use when running on GPUs.""" return self._preferred_gpu_toolkit @property def gpu_device_indices(self): """str: The indices of the GPUs to run on. This is purely an internal implementation detail and should not be relied upon externally.""" return self._gpu_device_indices
[docs] def __init__(self, number_of_threads=1, number_of_gpus=0, preferred_gpu_toolkit=None): """Constructs a new ComputeResources object. Parameters ---------- number_of_threads: int The number of threads available to a calculation worker. number_of_gpus: int The number of GPUs available to a calculation worker. preferred_gpu_toolkit: ComputeResources.GPUToolkit, optional The preferred toolkit to use when running on GPUs. """ self._number_of_threads = number_of_threads self._number_of_gpus = number_of_gpus self._preferred_gpu_toolkit = preferred_gpu_toolkit # A workaround for when using a local cluster backend which is # strictly for internal purposes only for now. self._gpu_device_indices = None assert self._number_of_threads >= 0 assert self._number_of_gpus >= 0 assert self._number_of_threads > 0 or self._number_of_gpus > 0 if self._number_of_gpus > 0: assert self._preferred_gpu_toolkit is not None
def __getstate__(self): return { 'number_of_threads': self.number_of_threads, 'number_of_gpus': self.number_of_gpus, 'preferred_gpu_toolkit': self.preferred_gpu_toolkit, '_gpu_device_indices': self._gpu_device_indices } def __setstate__(self, state): self._number_of_threads = state['number_of_threads'] self._number_of_gpus = state['number_of_gpus'] self._preferred_gpu_toolkit = state['preferred_gpu_toolkit'] self._gpu_device_indices = state['_gpu_device_indices'] def __eq__(self, other): return self.number_of_threads == other.number_of_threads and \ self.number_of_gpus == other.number_of_gpus and \ self.preferred_gpu_toolkit == other.preferred_gpu_toolkit def __ne__(self, other): return not self.__eq__(other)
[docs]class QueueWorkerResources(ComputeResources): """An extended resource object with properties specific to calculations which will run on queue based resources, such as LSF, PBS or SLURM. """ @property def per_thread_memory_limit(self): """simtk.Quantity: The maximum amount of memory available to each thread, such that the total memory limit will be `per_cpu_memory_limit * number_of_threads`.""" return self._per_thread_memory_limit @property def wallclock_time_limit(self): """str: The maximum amount of wall clock time that a worker can run for. This should be a string of the form `HH:MM` where HH is the number of hours and MM the number of minutes""" return self._wallclock_time_limit
[docs] def __init__(self, number_of_threads=1, number_of_gpus=0, preferred_gpu_toolkit=None, per_thread_memory_limit=1*unit.gigabytes, wallclock_time_limit="01:00"): """Constructs a new ComputeResources object. Notes ----- Both the requested `number_of_threads` and the `number_of_gpus` must be less than or equal to the number of threads (/cpus/cores) and GPUs available to each compute node in the cluster respectively, such that a single worker is able to be accommodated by a single compute node. Parameters ---------- per_thread_memory_limit: simtk.Quantity The maximum amount of memory available to each thread. wallclock_time_limit: str The maximum amount of wall clock time that a worker can run for. This should be a string of the form `HH:MM` where HH is the number of hours and MM the number of minutes """ super().__init__(number_of_threads, number_of_gpus, preferred_gpu_toolkit) self._per_thread_memory_limit = per_thread_memory_limit self._wallclock_time_limit = wallclock_time_limit assert self._per_thread_memory_limit is not None assert (isinstance(self._per_thread_memory_limit, unit.Quantity) and unit.get_base_units(unit.byte)[-1] == unit.get_base_units(self._per_thread_memory_limit.units)[-1]) assert self._per_thread_memory_limit > 0 * unit.byte assert wallclock_time_limit is not None assert isinstance(wallclock_time_limit, str) and len(wallclock_time_limit) > 0 wallclock_pattern = re.compile(r'\d\d:\d\d') assert wallclock_pattern.match(wallclock_time_limit) is not None
def __getstate__(self): base_dict = super(QueueWorkerResources, self).__getstate__() base_dict.update({ 'per_thread_memory_limit': self.number_of_threads, 'wallclock_time_limit': self.number_of_threads, }) return base_dict def __setstate__(self, state): super(QueueWorkerResources, self).__setstate__(state) self._per_thread_memory_limit = state['per_thread_memory_limit'] self._wallclock_time_limit = state['wallclock_time_limit'] def __eq__(self, other): return super(QueueWorkerResources, self).__eq__(other) and \ self.per_thread_memory_limit == other.per_thread_memory_limit and \ self.wallclock_time_limit == other.wallclock_time_limit def __ne__(self, other): return not self.__eq__(other)
[docs]class PropertyEstimatorBackend: """An abstract base representation of a property estimator backend. A backend is responsible for coordinating, distributing and running calculations on the available hardware. This may range from a single machine to a multinode cluster, but *not* accross multiple cluster or physical locations. Notes ----- All estimator backend classes must inherit from this class, and must implement the `start`, `stop`, and `submit_task` method. """
[docs] def __init__(self, number_of_workers=1, resources_per_worker=ComputeResources()): """Constructs a new PropertyEstimatorBackend object. Parameters ---------- number_of_workers : int The number of works to run the calculations on. One worker can perform a single task (e.g run a simulation) at once. resources_per_worker: ComputeResources The number of resources to request per worker. """ self._number_of_workers = number_of_workers self._resources_per_worker = resources_per_worker
def _get_worker_resources_dict(self): """Get dict representation of the resources requested by a worker. Returns ------- dict of str and int """ return { 'number_of_threads': self._resources_per_worker.number_of_threads, 'number_of_gpus': self._resources_per_worker.number_of_gpus, }
[docs] def start(self): """Start the calculation backend.""" pass
[docs] def stop(self): """Stop the calculation backend.""" pass
[docs] def submit_task(self, function, *args, **kwargs): """Submit a task to the compute resources managed by this backend. Parameters ---------- function: function The function to run. Returns ------- Future Returns a future object which will eventually point to the results of the submitted task. """ pass