Dask Kubernetes Backend
The framework implements a special set of calculation backends which integrate with the dask distributed and dask-kubernetes libraries.
These backends are designed to run on the National Research Platform
(NRP) and have not been otherwise tested.
Several separate components are required for executing Evaluator on NRP due to the limited user permissions we have:
a shared filesystem that is accessible by the
EvaluatorServerand theDaskKubernetesBackend. Typically this is constructed with a PersistentVolumeClaim.a
DaskKubernetesBackendthat can submit tasks to the Kubernetes cluster. This must be initiated locally with NRP. The backend must have the PVC mounted.an
EvaluatorServer, running remotely on a deployment on NRP, that can receive tasks from the localEvaluatorClient. This needs to connect to theDaskKubernetesBackendto submit tasks to the Kubernetes cluster. If permissions are limited as they are on NRP, you may not be able to create theDaskKubernetesBackendremotely. In that case, you will need aDaskKubernetesExistingBackendto connect to an existing KubeCluster.the
EvaluatorServerport forwarded so a localEvaluatorClientcan communicate with theEvaluatorServer.
PersistentVolumeClaims in Python
A PVC can be constructed with this tutorial, or dynamically through Python using the Kubernetes client:
import time
from kubernetes import client, config
from openff.units import unit
core_v1 = client.CoreV1Api()
# from https://ucsd-prp.gitlab.io/userdocs/storage/ceph/#currently-available-storageclasses
storage_class_name = "rook-cephfs-central"
# required space to request
storage_space = 1 * unit.gigabytes
pvc_spec = client.V1PersistentVolumeClaimSpec(
access_modes=["ReadWriteMany"],
storage_class_name=storage_class_name,
resources=client.V1ResourceRequirements(
requests={
"storage": f"{storage_space.to(unit.gigabytes).m}Gi",
}
),
)
pvc_name = f"evaluator-storage-{job_name}"
metadata = client.V1ObjectMeta(name=pvc_name)
pvc = client.V1PersistentVolumeClaim(
api_version="v1",
kind="PersistentVolumeClaim",
metadata=metadata,
spec=pvc_spec,
)
api_response = core_v1.create_namespaced_persistent_volume_claim(
namespace=namespace,
body=pvc
)
logger.info(
f"Created PVC {pvc.metadata.name}. State={api_response.status.phase}"
)
# wait for PVC to bind
timeout = 1000
end_time = time.time() + timeout
while time.time() < end_time:
pvc = core_v1.read_namespaced_persistent_volume_claim(name=pvc_name, namespace=namespace)
if pvc.status.phase == "Bound":
logger.info(f"PVC '{pvc_name}' is Bound.")
return pvc_name
logger.info(f"Waiting for PVC '{pvc_name}' to become Bound. Current phase: {pvc.status.phase}")
time.sleep(5)
Dask Kubernetes Cluster
The DaskKubernetesBackend backend wraps around the dask Dask KubeCluster
class to distribute tasks on Kubernetes:
# replace with own docker image
docker_image = "ghcr.io/lilyminium/openff-images:tmp-evaluator-dask-v2"
cluster_name = "evaluator-cluster"
namespace = "openforcefield" # namespace on NRP
backend = DaskKubernetesBackend(
cluster_name=cluster_name,
gpu_resources_per_worker=gpu_resources_per_worker, # see below
cpu_resources_per_worker=cpu_resources_per_worker, # see below
image=image,
namespace=namespace,
env={
"OE_LICENSE": "/secrets/oe_license.txt",
# daemonic processes are not allowed to have children
"DASK_DISTRIBUTED__WORKER__DAEMON": "False",
"DASK_LOGGING__DISTRIBUTED": "debug",
"DASK__TEMPORARY_DIRECTORY": "/evaluator-storage",
"STORAGE_DIRECTORY": "/evaluator-storage",
"EXTRA_PIP_PACKAGES": "jupyterlab"
},
volumes=[volume], # see below
secrets=[secret], # see below
annotate_resources=True, # see below
cluster_kwargs={"resource_timeout": 300}
)
Specifying pod resources
Pod resources should be specified using PodResources, which works analogously to ComputeResources,
but encodes settings for Kubernetes pods. For example:
from openff.units import unit
ephemeral_storage = 20 * unit.gigabytes
memory = 8 * unit.gigabytes
gpu_resources_per_worker=PodResources(
minimum_number_of_workers=0,
maximum_number_of_workers=10,
number_of_threads=1,
memory_limit=memory,
ephemeral_storage_limit=ephemeral_storage,
number_of_gpus=1,
preferred_gpu_toolkit=ComputeResources.GPUToolkit.CUDA,
)
cpu_resources_per_worker=PodResources(
minimum_number_of_workers=0,
maximum_number_of_workers=40,
number_of_threads=1,
memory_limit=memory,
ephemeral_storage_limit=ephemeral_storage,
number_of_gpus=0,
)
Specifying volumes
Volumes should be specified as a list of KubernetesPersistentVolumeClaim objects. For example:
volume = KubernetesPersistentVolumeClaim(
name="evaluator-storage", # `pvc_name`, the name of the PVC
mount_path="/evaluator-storage", # where to mount the PVC
)
Specifying secrets
Secrets should be specified as a list of KubernetesSecret objects. For example:
secret = KubernetesSecret(
name="openeye-license",
secret_name="oe-license",
mount_path="/secrets/oe_license.txt",
sub_path="oe_license.txt",
read_only=True,
)
This example of mounting an OpenEye license mounts the secret_name secret
at the mount_path path in the pod, at the sub_path path.
Note
A secret should first be created in Kubernetes as following the documentation.
Annotating resources
Dask allows you to specify whether tasks require particular
resources to be available on the worker used
to execute them. Setting annotate_resources=True will split tasks into those that can only be
executed on GPU workers, and those that can only be executed on CPU workers.
Simulation protocols such as OpenMMSimulation are executed on GPUs, whereas tasks such as packing boxes
are executed on CPUs. Splitting tasks this way will increase the GPU utilization of GPU workers.
The resources specified are ‘GPU’ (set to 0.5 per protocol to encourage multiple protocols to run on the same worker), and ‘notGPU’ (set to 1 per protocol). Workers are run with either the ‘GPU’ or ‘notGPU’ resource, and tasks are allocated to workers based on the resources they require.
Setting annotate_resources=False will allow tasks to be executed on any worker.
Dask Kubernetes Existing Backend
If you are unable to create a DaskKubernetesBackend remotely, you can connect to an existing KubeCluster
with the DaskKubernetesExistingBackend with the same arguments:
from openff.evaluator.backends.dask_kubernetes import DaskKubernetesExistingBackend
backend = DaskKubernetesExistingBackend(
cluster_name=cluster_name,
gpu_resources_per_worker=gpu_resources_per_worker,
cpu_resources_per_worker=cpu_resources_per_worker,
image=image,
namespace=namespace,
env={
"OE_LICENSE": "/secrets/oe_license.txt",
# daemonic processes are not allowed to have children
"DASK_DISTRIBUTED__WORKER__DAEMON": "False",
"DASK_LOGGING__DISTRIBUTED": "debug",
"DASK__TEMPORARY_DIRECTORY": "/evaluator-storage",
"STORAGE_DIRECTORY": "/evaluator-storage",
"EXTRA_PIP_PACKAGES": "jupyterlab"
},
volumes=[volume],
secrets=[secret],
annotate_resources=True,
cluster_kwargs={"resource_timeout": 300}
)
Not all of these are important to keep the same, as this cluster simply connects to an
already initialized DaskKubernetesBackend. However, the following are important to keep the same:
cluster_name– for connectionnamespace– for connectiongpu_resources_per_worker– the preferred_gpu_toolkit is important here, although not the number of workersvolumes– the PVC must be mountedsecrets– an OpenEye license would ideally be mountedannotate_resources– this controls whether or not to split tasks between GPU/CPU workers
Deployment
The EvaluatorServer can be deployed remotely on NRP with the following command:
with backend:
evaluator_server = EvaluatorServer(
backend=backend,
port=port,
debug=True,
)
evaluator_server.start(asynchronous=False)
Ideally this should be done on a Kubernetes deployment to ensure the EvaluatorServer is always running.
The EvaluatorServer should be port forwarded to allow ForceBalance to communicate with it on a server_port.