"""
A collection of openff-evaluator compute backends which use dask as the distribution engine.
"""
import abc
import importlib
import logging
import multiprocessing
import os
import platform
import shutil
import traceback
import dask
from dask import distributed
from distributed import get_worker
from openff import evaluator
from openff.evaluator.backends import (
CalculationBackend,
ComputeResources,
QueueWorkerResources,
)
from openff.evaluator.utils.utils import timestamp_formatter
logger = logging.getLogger(__name__)
class _Multiprocessor:
"""A temporary utility class which runs a given
function in a separate process.
"""
@staticmethod
def _wrapper(func, queue, args, kwargs):
"""A wrapper around the function to run in a separate
process which sets up logging and handle any extra
module loading.
Parameters
----------
func: function
The function to run in this process.
queue: Queue
The queue used to pass the results back
to the parent process.
args: tuple
The args to pass to the function
kwargs: dict
The kwargs to pass to the function
"""
try:
from openff.evaluator.workflow.plugins import registered_workflow_protocols
# Each spun up worker doesn't automatically import
# all of the modules which were imported in the main
# launch script, and as such custom plugins will no
# longer be registered. We re-import / register them
# here.
if "registered_workflow_protocols" in kwargs:
protocols_to_import = kwargs.pop("registered_workflow_protocols")
for protocol_class in protocols_to_import:
module_name = ".".join(protocol_class.split(".")[:-1])
class_name = protocol_class.split(".")[-1]
imported_module = importlib.import_module(module_name)
registered_workflow_protocols[class_name] = getattr(
imported_module, class_name
)
if "logger_path" in kwargs:
formatter = timestamp_formatter()
logger_path = kwargs.pop("logger_path")
worker_logger = logging.getLogger()
if not len(worker_logger.handlers):
logger_handler = logging.FileHandler(logger_path)
logger_handler.setFormatter(formatter)
worker_logger.setLevel(logging.INFO)
worker_logger.addHandler(logger_handler)
if (
not os.path.exists(logger_path)
or os.stat(logger_path).st_size == 0
):
worker_logger.info("=========================================")
worker_logger.info(f"HOSTNAME: {platform.node()}")
if os.environ.get("PBS_JOBID") is not None:
worker_logger.info(
f"PBSJOBID: {os.environ.get('PBS_JOBID')}"
)
elif os.environ.get("LSB_JOBID") is not None:
worker_logger.info(
f"LSBJOBID: {os.environ.get('LSB_JOBID')}"
)
elif os.environ.get("SLURM_JOB_ID") is not None:
worker_logger.info(
f"SLURMJOBID: {os.environ.get('SLURM_JOBID')}"
)
worker_logger.info(f"PLATFORM: {platform.platform()}")
worker_logger.info("-----------------------------------------")
worker_logger.info(
"PYTHON VERSION: "
f"{platform.python_version()} - "
f"{platform.python_implementation()}"
)
worker_logger.info("=========================================")
return_value = func(*args, **kwargs)
queue.put(return_value)
except Exception as e:
queue.put((e, e.__traceback__))
@staticmethod
def run(function, *args, **kwargs):
"""Runs a functions in its own process.
Parameters
----------
function: function
The function to run.
args: Any
The arguments to pass to the function.
kwargs: Any
The key word arguments to pass to the function.
Returns
-------
Any
The result of the function
"""
# An unpleasant way to ensure that codecov works correctly
# when testing on GHA.
if hasattr(evaluator, "_called_from_test"):
return function(*args, **kwargs)
# queue = multiprocessing.Queue()
manager = multiprocessing.Manager()
queue = manager.Queue()
target_args = [function, queue, args, kwargs]
process = multiprocessing.Process(
target=_Multiprocessor._wrapper, args=target_args
)
process.start()
return_value = queue.get()
process.join()
if (
isinstance(return_value, tuple)
and len(return_value) > 0
and isinstance(return_value[0], Exception)
):
formatted_exception = traceback.format_exception(
None, return_value[0], return_value[1]
)
logger.info(f"{formatted_exception} {return_value[0]} {return_value[1]}")
raise return_value[0]
return return_value
[docs]class BaseDaskBackend(CalculationBackend, abc.ABC):
"""A base `dask` backend class, which implements functionality
which is common to all other `dask` based backends.
"""
[docs] def __init__(self, number_of_workers=1, resources_per_worker=ComputeResources()):
"""Constructs a new BaseDaskBackend object."""
super().__init__(number_of_workers, resources_per_worker)
self._cluster = None
self._client = None
[docs] def start(self):
super(BaseDaskBackend, self).start()
self._client = distributed.Client(self._cluster)
[docs] def stop(self):
if self._client is not None:
self._client.close()
if self._cluster is not None:
self._cluster.close()
if os.path.isdir("dask-worker-space"):
shutil.rmtree("dask-worker-space")
@staticmethod
def _wrapped_function(function, *args, **kwargs):
"""A function which is wrapped around any function submitted via
`submit_task`, which adds extra meta data to the args and kwargs
(such as the compute resources available to the function) and may
perform extra validation before the function is passed to `dask`.
Parameters
----------
function: function
The function which will be executed by dask.
args: Any
The list of args to pass to the function.
kwargs: Any
The list of kwargs to pass to the function.
Returns
-------
Any
Returns the output of the function without modification, unless
an uncaught exception is raised in which case a EvaluatorException
is returned.
"""
raise NotImplementedError()
[docs]class BaseDaskJobQueueBackend(BaseDaskBackend):
"""An openff-evaluator backend which uses a `dask_jobqueue.JobQueueCluster`
object to run calculations within an existing HPC queuing system.
See Also
--------
dask_jobqueue.JobQueueCluster
"""
[docs] def __init__(
self,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
resources_per_worker=QueueWorkerResources(),
queue_name="default",
setup_script_commands=None,
extra_script_options=None,
adaptive_interval="10000ms",
disable_nanny_process=False,
cluster_type=None,
adaptive_class=None,
):
"""Constructs a new BaseDaskJobQueueBackend object
Parameters
----------
minimum_number_of_workers: int
The minimum number of workers to request from the queue system.
maximum_number_of_workers: int
The maximum number of workers to request from the queue system.
resources_per_worker: QueueWorkerResources
The resources to request per worker.
queue_name: str
The name of the queue which the workers will be requested
from.
setup_script_commands: list of str
A list of bash script commands to call within the queue submission
script before the call to launch the dask worker.
This may include activating a python environment, or loading
an environment module
extra_script_options: list of str
A list of extra job specific options to include in the queue
submission script. These will get added to the script header in the form
#BSUB <extra_script_options[x]>
adaptive_interval: str
The interval between attempting to either scale up or down
the cluster, of of the from 'XXXms'.
disable_nanny_process: bool
If true, dask workers will be started in `--no-nanny` mode. This
is required if using multiprocessing code within submitted tasks.
This has not been fully tested yet and my lead to stability issues
with the workers.
adaptive_class: class of type `distributed.deploy.AdaptiveCore`, optional
An optional class to pass to dask to use for its adaptive
scaling handling. This is mainly exposed to allow easily working around
certain dask bugs / quirks.
"""
super().__init__(minimum_number_of_workers, resources_per_worker)
assert isinstance(resources_per_worker, QueueWorkerResources)
assert minimum_number_of_workers <= maximum_number_of_workers
assert cluster_type is not None
if resources_per_worker.number_of_gpus > 0:
if resources_per_worker.number_of_gpus > 1:
raise ValueError("Only one GPU per worker is currently supported.")
# For now we need to set this to some high number to ensure
# jobs restarting because of workers being killed (due to
# wall-clock time limits mainly) do not get terminated. This
# should mostly be safe as we most wrap genuinely thrown
# exceptions up as EvaluatorExceptions and return these
# gracefully (such that the task won't be marked as failed by
# dask).
dask.config.set({"distributed.scheduler.allowed-failures": 500})
self._minimum_number_of_workers = minimum_number_of_workers
self._maximum_number_of_workers = maximum_number_of_workers
self._queue_name = queue_name
self._setup_script_commands = setup_script_commands
self._extra_script_options = extra_script_options
self._adaptive_interval = adaptive_interval
self._adaptive_class = adaptive_class
self._disable_nanny_process = disable_nanny_process
self._cluster_type = cluster_type
def _get_env_extra(self):
"""Returns a list of extra commands to run before
the dask worker is started.
Returns
-------
list of str
The extra commands to run.
"""
env_extra = dask.config.get(
f"jobqueue.{self._cluster_type}.job-script-prologue", default=[]
)
if self._setup_script_commands is not None:
env_extra.extend(self._setup_script_commands)
return env_extra
def _get_job_extra(self):
"""Returns a list of extra options to add to the
worker job script header lines.
Returns
-------
list of str
The extra header options to add.
"""
job_extra = dask.config.get(
f"jobqueue.{self._cluster_type}.job-extra-directives", default=[]
)
if self._extra_script_options is not None:
job_extra.extend(self._extra_script_options)
return job_extra
def _get_cluster_class(self):
"""Returns the type of `dask_jobqueue.JobQueueCluster` to
create.
Returns
-------
class
The class of cluster to create.
"""
raise NotImplementedError()
def _get_extra_cluster_kwargs(self):
"""Returns a dictionary of extra kwargs to pass to the cluster.
Returns
-------
dict of str and Any
The kwargs dictionary to pass.
"""
return {}
[docs] def job_script(self):
"""Returns the job script that dask will use to submit workers.
The backend must be started before calling this function.
Returns
-------
str
"""
if self._cluster is None:
raise ValueError(
"The cluster is not initialized. This is usually"
"caused by calling `job_script` before `start`."
)
return self._cluster.job_script()
[docs] def start(self):
requested_memory = self._resources_per_worker.per_thread_memory_limit
memory_string = f"{requested_memory.to(evaluator.unit.byte):~}".replace(" ", "")
job_extra = self._get_job_extra()
env_extra = self._get_env_extra()
extra = None if not self._disable_nanny_process else ["--no-nanny"]
cluster_class = self._get_cluster_class()
self._cluster = cluster_class(
queue=self._queue_name,
cores=self._resources_per_worker.number_of_threads,
memory=memory_string,
walltime=self._resources_per_worker.wallclock_time_limit,
job_extra=job_extra,
env_extra=env_extra,
extra=extra,
local_directory="dask-worker-space",
**self._get_extra_cluster_kwargs(),
)
# The very small target duration is an attempt to force dask to scale
# based on the number of processing tasks per worker.
extra_kwargs = {}
if self._adaptive_class is not None:
extra_kwargs["Adaptive"] = self._adaptive_class
self._cluster.adapt(
minimum=self._minimum_number_of_workers,
maximum=self._maximum_number_of_workers,
interval=self._adaptive_interval,
target_duration="0.00000000001s",
**extra_kwargs,
)
super(BaseDaskJobQueueBackend, self).start()
@staticmethod
def _wrapped_function(function, *args, **kwargs):
available_resources = kwargs["available_resources"]
per_worker_logging = kwargs.pop("per_worker_logging")
gpu_assignments = kwargs.pop("gpu_assignments")
# Set up the logging per worker if the flag is set to True.
if per_worker_logging:
# Each worker should have its own log file.
os.makedirs("worker-logs", exist_ok=True)
kwargs["logger_path"] = os.path.join(
"worker-logs", f"{get_worker().id}.log"
)
if available_resources.number_of_gpus > 0:
worker_id = distributed.get_worker().id
available_resources._gpu_device_indices = (
"0" if worker_id not in gpu_assignments else gpu_assignments[worker_id]
)
logger.info(
f"Launching a job with access to GPUs "
f"{available_resources._gpu_device_indices}"
)
return_value = _Multiprocessor.run(function, *args, **kwargs)
return return_value
# return function(*args, **kwargs)
[docs] def submit_task(self, function, *args, **kwargs):
from openff.evaluator.workflow.plugins import registered_workflow_protocols
key = kwargs.pop("key", None)
protocols_to_import = [
protocol_class.__module__ + "." + protocol_class.__qualname__
for protocol_class in registered_workflow_protocols.values()
]
return self._client.submit(
BaseDaskJobQueueBackend._wrapped_function,
function,
*args,
**kwargs,
available_resources=self._resources_per_worker,
registered_workflow_protocols=protocols_to_import,
gpu_assignments={},
per_worker_logging=True,
key=key,
)
[docs]class DaskLSFBackend(BaseDaskJobQueueBackend):
"""An openff-evaluator backend which uses a `dask_jobqueue.LSFCluster`
object to run calculations within an existing LSF queue.
See Also
--------
dask_jobqueue.LSFCluster
DaskPBSBackend
"""
[docs] def __init__(
self,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
resources_per_worker=QueueWorkerResources(),
queue_name="default",
setup_script_commands=None,
extra_script_options=None,
adaptive_interval="10000ms",
disable_nanny_process=False,
adaptive_class=None,
):
"""Constructs a new DaskLSFBackend object
Examples
--------
To create an LSF queueing compute backend which will attempt to spin up
workers which have access to a single GPU.
>>> # Create a resource object which will request a worker with
>>> # one gpu which will stay alive for five hours.
>>> from openff.evaluator.backends import QueueWorkerResources
>>>
>>> resources = QueueWorkerResources(number_of_threads=1,
>>> number_of_gpus=1,
>>> preferred_gpu_toolkit=QueueWorkerResources.GPUToolkit.CUDA,
>>> preferred_gpu_precision=QueueWorkerResources.GPUPrecision.mixed,
>>> wallclock_time_limit='05:00')
>>>
>>> # Define the set of commands which will set up the correct environment
>>> # for each of the workers.
>>> setup_script_commands = [
>>> 'module load cuda/9.2',
>>> ]
>>>
>>> # Define extra options to only run on certain node groups
>>> extra_script_options = [
>>> '-m "ls-gpu lt-gpu"'
>>> ]
>>>
>>>
>>> # Create the backend which will adaptively try to spin up between one and
>>> # ten workers with the requested resources depending on the calculation load.
>>> from openff.evaluator.backends.dask import DaskLSFBackend
>>>
>>> lsf_backend = DaskLSFBackend(minimum_number_of_workers=1,
>>> maximum_number_of_workers=10,
>>> resources_per_worker=resources,
>>> queue_name='gpuqueue',
>>> setup_script_commands=setup_script_commands,
>>> extra_script_options=extra_script_options)
"""
super().__init__(
minimum_number_of_workers,
maximum_number_of_workers,
resources_per_worker,
queue_name,
setup_script_commands,
extra_script_options,
adaptive_interval,
disable_nanny_process,
cluster_type="lsf",
adaptive_class=adaptive_class,
)
def _get_job_extra(self):
job_extra = super(DaskLSFBackend, self)._get_job_extra()
if self._resources_per_worker.number_of_gpus > 0:
job_extra.append(
"-gpu num={}:j_exclusive=yes:mode=shared:mps=no:".format(
self._resources_per_worker.number_of_gpus
)
)
return job_extra
def _get_extra_cluster_kwargs(self):
requested_memory = self._resources_per_worker.per_thread_memory_limit
memory_bytes = requested_memory.to(evaluator.unit.byte).magnitude
extra_kwargs = super(DaskLSFBackend, self)._get_extra_cluster_kwargs()
extra_kwargs.update({"mem": memory_bytes})
return extra_kwargs
def _get_cluster_class(self):
from dask_jobqueue import LSFCluster
return LSFCluster
[docs]class DaskPBSBackend(BaseDaskJobQueueBackend):
"""An openff-evaluator backend which uses a `dask_jobqueue.PBSCluster`
object to run calculations within an existing PBS queue.
See Also
--------
dask_jobqueue.LSFCluster
DaskLSFBackend
"""
[docs] def __init__(
self,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
resources_per_worker=QueueWorkerResources(),
queue_name="default",
setup_script_commands=None,
extra_script_options=None,
adaptive_interval="10000ms",
disable_nanny_process=False,
resource_line=None,
adaptive_class=None,
):
"""Constructs a new DaskLSFBackend object
Parameters
----------
resource_line: str
The string to pass to the `#PBS -l` line.
Examples
--------
To create a PBS queueing compute backend which will attempt to spin up
workers which have access to a single GPU.
>>> # Create a resource object which will request a worker with
>>> # one gpu which will stay alive for five hours.
>>> from openff.evaluator.backends import QueueWorkerResources
>>>
>>> resources = QueueWorkerResources(number_of_threads=1,
>>> number_of_gpus=1,
>>> preferred_gpu_toolkit=QueueWorkerResources.GPUToolkit.CUDA,
>>> preferred_gpu_precision=QueueWorkerResources.GPUPrecision.mixed,
>>> wallclock_time_limit='05:00')
>>>
>>> # Define the set of commands which will set up the correct environment
>>> # for each of the workers.
>>> setup_script_commands = [
>>> 'module load cuda/9.2',
>>> ]
>>>
>>> # Create the backend which will adaptively try to spin up between one and
>>> # ten workers with the requested resources depending on the calculation load.
>>> from openff.evaluator.backends.dask import DaskPBSBackend
>>>
>>> pbs_backend = DaskPBSBackend(minimum_number_of_workers=1,
>>> maximum_number_of_workers=10,
>>> resources_per_worker=resources,
>>> queue_name='gpuqueue',
>>> setup_script_commands=setup_script_commands)
"""
super().__init__(
minimum_number_of_workers,
maximum_number_of_workers,
resources_per_worker,
queue_name,
setup_script_commands,
extra_script_options,
adaptive_interval,
disable_nanny_process,
cluster_type="pbs",
adaptive_class=adaptive_class,
)
self._resource_line = resource_line
def _get_extra_cluster_kwargs(self):
extra_kwargs = super(DaskPBSBackend, self)._get_extra_cluster_kwargs()
extra_kwargs.update({"resource_spec": self._resource_line})
return extra_kwargs
def _get_cluster_class(self):
from dask_jobqueue import PBSCluster
return PBSCluster
class DaskSLURMBackend(BaseDaskJobQueueBackend):
"""An openff-evaluator backend which uses a `dask_jobqueue.SLURMCluster`
object to run calculations within an existing SLURM queue.
See Also
--------
dask_jobqueue.SLURMCluster
DaskSLURMBackend
"""
def __init__(
self,
minimum_number_of_workers=1,
maximum_number_of_workers=1,
resources_per_worker=QueueWorkerResources(),
queue_name="default",
setup_script_commands=None,
extra_script_options=None,
adaptive_interval="10000ms",
disable_nanny_process=False,
adaptive_class=None,
):
"""
Examples
--------
To create a SLURM queueing compute backend which will attempt to spin up
workers which have access to a single GPU.
>>> # Create a resource object which will request a worker with
>>> # one gpu which will stay alive for five hours.
>>> from openff.evaluator.backends import QueueWorkerResources
>>>
>>> resources = QueueWorkerResources(number_of_threads=1,
>>> number_of_gpus=1,
>>> preferred_gpu_toolkit=QueueWorkerResources.GPUToolkit.CUDA,
>>> preferred_gpu_precision=QueueWorkerResources.GPUPrecision.mixed,
>>> wallclock_time_limit='05:00')
>>>
>>> # Define the set of commands which will set up the correct environment
>>> # for each of the workers.
>>> setup_script_commands = [
>>> 'module load cuda/9.2',
>>> ]
>>>
>>> # Create the backend which will adaptively try to spin up between one and
>>> # ten workers with the requested resources depending on the calculation load.
>>> from openff.evaluator.backends.dask import DaskSLURMBackend
>>>
>>> slurm_backend = DaskSLURMBackend(minimum_number_of_workers=1,
>>> maximum_number_of_workers=10,
>>> resources_per_worker=resources,
>>> queue_name='gpuqueue',
>>> setup_script_commands=setup_script_commands)
"""
super().__init__(
minimum_number_of_workers,
maximum_number_of_workers,
resources_per_worker,
queue_name,
setup_script_commands,
extra_script_options,
adaptive_interval,
disable_nanny_process,
cluster_type="slurm",
adaptive_class=adaptive_class,
)
def _get_cluster_class(self):
from dask_jobqueue import SLURMCluster
return SLURMCluster
[docs]class DaskLocalCluster(BaseDaskBackend):
"""An openff-evaluator backend which uses a `dask` `LocalCluster`
object to run calculations on a single machine.
See Also
--------
dask.LocalCluster
"""
[docs] def __init__(self, number_of_workers=1, resources_per_worker=ComputeResources()):
"""Constructs a new DaskLocalCluster"""
super().__init__(number_of_workers, resources_per_worker)
self._gpu_device_indices_by_worker = {}
maximum_threads = multiprocessing.cpu_count()
requested_threads = number_of_workers * resources_per_worker.number_of_threads
if requested_threads > maximum_threads:
raise ValueError(
"The total number of requested threads ({})is greater than is available on the"
"machine ({})".format(requested_threads, maximum_threads)
)
if resources_per_worker.number_of_gpus > 0:
if resources_per_worker.number_of_gpus > 1:
raise ValueError("Only one GPU per worker is currently supported.")
visible_devices = os.environ.get("CUDA_VISIBLE_DEVICES")
if visible_devices is None:
raise ValueError("The CUDA_VISIBLE_DEVICES variable is empty.")
gpu_device_indices = visible_devices.split(",")
if len(gpu_device_indices) != number_of_workers:
raise ValueError(
"The number of available GPUs {} must match "
"the number of requested workers {}."
)
[docs] def start(self):
self._cluster = distributed.LocalCluster(
name=None,
n_workers=self._number_of_workers,
threads_per_worker=1,
processes=False,
)
if self._resources_per_worker.number_of_gpus > 0:
if isinstance(self._cluster.workers, dict):
for index, worker in self._cluster.workers.items():
self._gpu_device_indices_by_worker[worker.id] = str(index)
else:
for index, worker in enumerate(self._cluster.workers):
self._gpu_device_indices_by_worker[worker.id] = str(index)
super(DaskLocalCluster, self).start()
@staticmethod
def _wrapped_function(function, *args, **kwargs):
available_resources = kwargs["available_resources"]
gpu_assignments = kwargs.pop("gpu_assignments")
if available_resources.number_of_gpus > 0:
worker_id = distributed.get_worker().id
available_resources._gpu_device_indices = gpu_assignments[worker_id]
logger.info(
"Launching a job with access to GPUs {}".format(
gpu_assignments[worker_id]
)
)
return_value = _Multiprocessor.run(function, *args, **kwargs)
return return_value
# return function(*args, **kwargs)
[docs] def submit_task(self, function, *args, **kwargs):
key = kwargs.pop("key", None)
return self._client.submit(
DaskLocalCluster._wrapped_function,
function,
*args,
**kwargs,
key=key,
available_resources=self._resources_per_worker,
gpu_assignments=self._gpu_device_indices_by_worker,
)