"""
A collection of specialized workflow building blocks, which when chained together,
form a larger property estimation workflow.
"""
import copy
from propertyestimator.utils import graph, utils
from propertyestimator.utils.utils import get_nested_attribute, set_nested_attribute
from propertyestimator.workflow.decorators import protocol_input, MergeBehaviour
from propertyestimator.workflow.schemas import ProtocolSchema
from propertyestimator.workflow.utils import ProtocolPath
[docs]class BaseProtocol:
"""The base class for a protocol which would form one
step of a larger property calculation workflow.
A protocol may for example:
* create the coordinates of a mixed simulation box
* set up a bound ligand-protein system
* build the simulation topology
* perform an energy minimisation
An individual protocol may require a set of inputs, which may either be
set as constants
>>> from propertyestimator.protocols.simulation import RunOpenMMSimulation
>>>
>>> npt_equilibration = RunOpenMMSimulation('npt_equilibration')
>>> npt_equilibration.ensemble = RunOpenMMSimulation.Ensemble.NPT
or from the output of another protocol, pointed to by a ProtocolPath
>>> npt_production = RunOpenMMSimulation('npt_production')
>>> # Use the coordinate file output by the npt_equilibration protocol
>>> # as the input to the npt_production protocol
>>> npt_production.input_coordinate_file = ProtocolPath('output_coordinate_file',
>>> npt_equilibration.id)
In this way protocols may be chained together, thus defining a larger property
calculation workflow from simple, reusable building blocks.
.. warning:: This class is still heavily under development and is subject to
rapid changes.
"""
@property
def id(self):
"""str: The unique id of this protocol."""
return self._id
@property
def schema(self):
"""ProtocolSchema: A serializable schema for this object."""
return self._get_schema()
@schema.setter
def schema(self, schema_value):
self._set_schema(schema_value)
@property
def dependencies(self):
"""list of ProtocolPath: A list of pointers to the protocols which this
protocol takes input from.
"""
return_dependencies = []
for input_path in self.required_inputs:
value_references = self.get_value_references(input_path)
if len(value_references) == 0:
continue
for value_reference in value_references.values():
if value_reference in return_dependencies:
continue
if (value_reference.start_protocol is None or
value_reference.start_protocol == self.id):
continue
return_dependencies.append(value_reference)
return return_dependencies
@protocol_input(value_type=bool)
def allow_merging(self):
"""bool: If true, this protocol is allowed to merge with other identical protocols."""
pass
[docs] def __init__(self, protocol_id):
# A unique identifier for this node.
self._id = protocol_id
# Defines whether a protocol is allowed to try and merge with other identical ones.
self._allow_merging = True
self.provided_outputs = []
self.required_inputs = []
self._initialize()
[docs] def execute(self, directory, available_resources):
""" Execute the protocol.
Protocols may be chained together by passing the output
of previous protocols as input to the current one.
Parameters
----------
directory: str
The directory to store output data in.
available_resources: ComputeResources
The resources available to execute on.
Returns
----------
Dict[str, Any]
The output of the execution.
"""
return self._get_output_dictionary()
def _initialize(self):
"""Initialize the protocol."""
# Find the required inputs and outputs.
self.provided_outputs = []
self.required_inputs = []
output_attributes = utils.find_types_with_decorator(type(self), 'ProtocolOutputObject')
input_attributes = utils.find_types_with_decorator(type(self), 'ProtocolInputObject')
for output_attribute in output_attributes:
self.provided_outputs.append(ProtocolPath(output_attribute))
for input_attribute in input_attributes:
self.required_inputs.append(ProtocolPath(input_attribute))
# The directory in which to execute the protocol.
self.directory = None
def _get_schema(self):
"""Returns this protocols properties (i.e id and parameters)
as a ProtocolSchema
Returns
-------
ProtocolSchema
The schema representation.
"""
schema = ProtocolSchema()
schema.id = self.id
schema.type = type(self).__name__
for input_path in self.required_inputs:
if not (input_path.start_protocol is None or (input_path.start_protocol == self.id and
input_path.start_protocol == input_path.last_protocol)):
continue
# Always make sure to only pass a copy of the input. Changing the schema
# should NOT change the protocol.
schema.inputs[input_path.full_path] = copy.deepcopy(self.get_value(input_path))
return schema
def _set_schema(self, schema_value):
"""Sets this protocols properties (i.e id and parameters)
from a ProtocolSchema
Parameters
----------
schema_value: ProtocolSchema
The schema which will describe this protocol.
"""
self._id = schema_value.id
if type(self).__name__ != schema_value.type:
# Make sure this object is the correct type.
raise ValueError('Cannot convert a {} protocol to a {}.'
.format(str(type(self)), schema_value.type))
for input_full_path in schema_value.inputs:
value = copy.deepcopy(schema_value.inputs[input_full_path])
input_path = ProtocolPath.from_string(input_full_path)
self.set_value(input_path, value)
def _get_output_dictionary(self):
"""Builds a dictionary of the output property names and their values.
Returns
-------
Dict[str, Any]
A dictionary whose keys are the output property names, and the
values their associated values.
"""
return_dictionary = {}
for output_path in self.provided_outputs:
return_dictionary[output_path.full_path] = self.get_value(output_path)
return return_dictionary
[docs] def set_uuid(self, value):
"""Store the uuid of the calculation this protocol belongs to
Parameters
----------
value : str
The uuid of the parent calculation.
"""
if self.id.find(value) >= 0:
return
self._id = graph.append_uuid(self.id, value)
for input_path in self.required_inputs:
input_path.append_uuid(value)
value_references = self.get_value_references(input_path)
for value_reference in value_references.values():
value_reference.append_uuid(value)
for output_path in self.provided_outputs:
output_path.append_uuid(value)
[docs] def replace_protocol(self, old_id, new_id):
"""Finds each input which came from a given protocol
and redirects it to instead take input from a new one.
Notes
-----
This method is mainly intended to be used only when merging
multiple protocols into one.
Parameters
----------
old_id : str
The id of the old input protocol.
new_id : str
The id of the new input protocol.
"""
for input_path in self.required_inputs:
input_path.replace_protocol(old_id, new_id)
if input_path.start_protocol is not None or (input_path.start_protocol != input_path.last_protocol and
input_path.start_protocol != self.id):
continue
value_references = self.get_value_references(input_path)
for value_reference in value_references.values():
value_reference.replace_protocol(old_id, new_id)
for output_path in self.provided_outputs:
output_path.replace_protocol(old_id, new_id)
if self._id == old_id:
self._id = new_id
[docs] def can_merge(self, other):
"""Determines whether this protocol can be merged with another.
Parameters
----------
other : :obj:`BaseProtocol`
The protocol to compare against.
Returns
----------
bool
True if the two protocols are safe to merge.
"""
if not self.allow_merging:
return False
if not isinstance(self, type(other)):
return False
for input_path in self.required_inputs:
if input_path.start_protocol is not None and input_path.start_protocol != self.id:
continue
# Do not consider paths that point to child (e.g grouped) protocols.
# These should be handled by the container classes themselves.
if not (input_path.start_protocol is None or (
input_path.start_protocol == input_path.last_protocol and
input_path.start_protocol == self.id)):
continue
# If no merge behaviour flag is present (for example in the case of
# ConditionalGroup conditions), simply assume this is handled explicitly
# elsewhere.
if not hasattr(type(self), input_path.property_name):
continue
if not hasattr(getattr(type(self), input_path.property_name), 'merge_behavior'):
continue
merge_behavior = getattr(type(self), input_path.property_name).merge_behavior
if merge_behavior != MergeBehaviour.ExactlyEqual:
continue
if input_path not in other.required_inputs:
return False
self_value = self.get_value(input_path)
other_value = other.get_value(input_path)
if self_value != other_value:
return False
return True
[docs] def merge(self, other):
"""Merges another BaseProtocol with this one. The id
of this protocol will remain unchanged.
It is assumed that can_merge has already returned that
these protocols are compatible to be merged together.
Parameters
----------
other: BaseProtocol
The protocol to merge into this one.
Returns
-------
Dict[str, str]
A map between any original protocol ids and their new merged values.
"""
for input_path in self.required_inputs:
# Do not consider paths that point to child (e.g grouped) protocols.
# These should be handled by the container classes themselves.
if not (input_path.start_protocol is None or (
input_path.start_protocol == input_path.last_protocol and
input_path.start_protocol == self.id)):
continue
# If no merge behaviour flag is present (for example in the case of
# ConditionalGroup conditions), simply assume this is handled explicitly
# elsewhere.
if not hasattr(type(self), input_path.property_name):
continue
if not hasattr(getattr(type(self), input_path.property_name), 'merge_behavior'):
continue
merge_behavior = getattr(type(self), input_path.property_name).merge_behavior
if merge_behavior == MergeBehaviour.ExactlyEqual:
continue
value = None
if merge_behavior == MergeBehaviour.SmallestValue:
value = min(self.get_value(input_path), other.get_value(input_path))
elif merge_behavior == MergeBehaviour.GreatestValue:
value = max(self.get_value(input_path), other.get_value(input_path))
self.set_value(input_path, value)
return {}
[docs] def get_value_references(self, input_path):
"""Returns a dictionary of references to the protocols which one of this
protocols inputs (specified by `input_path`) takes its value from.
Notes
-----
Currently this method only functions correctly for an input value which
is either currently a :obj:`ProtocolPath`, or a `list` / `dict` which contains
at least one :obj:`ProtocolPath`.
Parameters
----------
input_path: :obj:`propertyestimator.workflow.utils.ProtocolPath`
The input value to check.
Returns
-------
dict of ProtocolPath and ProtocolPath
A dictionary of the protocol paths that the input targeted by `input_path` depends upon.
"""
input_value = self.get_value(input_path)
if isinstance(input_value, ProtocolPath):
return {input_path: input_value}
if (not isinstance(input_value, list) and
not isinstance(input_value, tuple) and
not isinstance(input_value, dict)):
return {}
property_name, protocols_ids = ProtocolPath.to_components(input_path.full_path)
return_paths = {}
if isinstance(input_value, list) or isinstance(input_value, tuple):
for index, list_value in enumerate(input_value):
if not isinstance(list_value, ProtocolPath):
continue
path_index = ProtocolPath(property_name + '[{}]'.format(index), *protocols_ids)
return_paths[path_index] = list_value
else:
for dict_key in input_value:
if not isinstance(input_value[dict_key], ProtocolPath):
continue
path_index = ProtocolPath(property_name + '[{}]'.format(dict_key), *protocols_ids)
return_paths[path_index] = input_value[dict_key]
return return_paths
[docs] def get_attribute_type(self, reference_path):
"""Returns the type of one of the protocol input/output attributes.
Parameters
----------
reference_path: ProtocolPath
The path pointing to the value whose type to return.
Returns
----------
type:
The type of the attribute.
"""
if reference_path.start_protocol is not None and reference_path.start_protocol != self.id:
raise ValueError('The reference path {} does not point to this protocol'.format(reference_path))
if (reference_path.property_name.count(ProtocolPath.property_separator) >= 1 or
reference_path.property_name.find('[') > 0):
return None
# raise ValueError('The expected type cannot be found for '
# 'nested property names: {}'.format(reference_path.property_name))
return getattr(type(self), reference_path.property_name).value_type
[docs] def get_value(self, reference_path):
"""Returns the value of one of this protocols inputs / outputs.
Parameters
----------
reference_path: ProtocolPath
The path pointing to the value to return.
Returns
----------
Any:
The value of the input / output
"""
if (reference_path.start_protocol is not None and
reference_path.start_protocol != self.id):
raise ValueError('The reference path does not target this protocol.')
if reference_path.property_name is None or reference_path.property_name == '':
raise ValueError('The reference path does specify a property to return.')
return get_nested_attribute(self, reference_path.property_name)
[docs] def set_value(self, reference_path, value):
"""Sets the value of one of this protocols inputs.
Parameters
----------
reference_path: ProtocolPath
The path pointing to the value to return.
value: Any
The value to set.
"""
if (reference_path.start_protocol is not None and
reference_path.start_protocol != self.id):
raise ValueError('The reference path does not target this protocol.')
if reference_path.property_name is None or reference_path.property_name == '':
raise ValueError('The reference path does specify a property to set.')
if reference_path in self.provided_outputs:
raise ValueError('Output values cannot be set by this method.')
set_nested_attribute(self, reference_path.property_name, value)
[docs] def apply_replicator(self, replicator, template_values, template_index=-1,
template_value=None, update_input_references=False):
"""Applies a `ProtocolReplicator` to this protocol. This method
should clone any protocols whose id contains the id of the
replicator (in the format `$(replicator.id)`).
Parameters
----------
replicator: ProtocolReplicator
The replicator to apply.
template_values: list of Any
A list of the values which will be inserted
into the newly replicated protocols.
This parameter is mutually exclusive with
`template_index` and `template_value`
template_index: int, optional
A specific value which should be used for any
protocols flagged as to be replicated by the
replicator. This option is mainly used when
replicating children of an already replicated
protocol.
This parameter is mutually exclusive with
`template_values` and must be set along with
a `template_value`.
template_value: Any, optional
A specific index which should be used for any
protocols flagged as to be replicated by the
replicator. This option is mainly used when
replicating children of an already replicated
protocol.
This parameter is mutually exclusive with
`template_values` and must be set along with
a `template_index`.
update_input_references: bool
If true, any protocols which take their input from a protocol
which was flagged for replication will be updated to take input
from the actually replicated protocol. This should only be set
to true if this protocol is not nested within a workflow or a
protocol group.
This option cannot be used when a specific `template_index` or
`template_value` is providied.
Returns
-------
dict of ProtocolPath and list of tuple of ProtocolPath and int
A dictionary of references to all of the protocols which have
been replicated, with keys of original protocol ids. Each value
is comprised of a list of the replicated protocol ids, and their
index into the `template_values` array.
"""
return {}