Evaluator client side API.
import copy
import json
import logging
import socket
import traceback
from collections import defaultdict
from enum import Enum
from time import sleep
from openff.evaluator.attributes import UNDEFINED, Attribute, AttributeClass
from openff.evaluator.datasets import PhysicalPropertyDataSet
from openff.evaluator.forcefield import (
from openff.evaluator.layers import (
from openff.evaluator.layers.workflow import WorkflowCalculationSchema
from openff.evaluator.utils.exceptions import EvaluatorException
from openff.evaluator.utils.serialization import TypedJSONDecoder
from openff.evaluator.utils.tcp import (
logger = logging.getLogger(__name__)
[docs]class ConnectionOptions(AttributeClass):
"""The options to use when connecting to an `EvaluatorServer`"""
server_address = Attribute(
docstring="The address of the server to connect to.",
server_port = Attribute(
docstring="The port of the server to connect to.",
[docs] def __init__(self, server_address=None, server_port=None):
server_address: str
The address of the server to connect to.
server_port: int
The port of the server to connect to.
if server_address is not None:
self.server_address = server_address
if server_port is not None:
self.server_port = server_port
[docs]class BatchMode(Enum):
"""The different modes in which a server can batch together properties
to estimate.
This enum may take values of
* SameComponents: All properties measured for substances containing exactly
the same components will be placed into a single batch. E.g. The density of
a 80:20 and a 20:80 mix of ethanol and water would be batched together, but
the density of pure ethanol and the density of pure water would be placed into
separate batches.
* SharedComponents: All properties measured for substances containing at least
common component will be batched together. E.g.The densities of 80:20 and 20:80
mixtures of ethanol and water, and the pure densities of ethanol and water would
be batched together.
* NoBatch: No batching will be performed. Each property will be estimated in a
single, sequentially-increasing batch.
Properties will only be marked as estimated by the server when all properties in a
single batch are completed.
SameComponents = "SameComponents"
SharedComponents = "SharedComponents"
NoBatch = "NoBatch"
[docs]class Request(AttributeClass):
"""An estimation request which has been sent to a `EvaluatorServer`
This object can be used to query and retrieve the results of the
request when finished, or be stored to retrieve the request at some
point in the future."""
id = Attribute(
docstring="The unique id assigned to this request by the server.", type_hint=str
connection_options = Attribute(
docstring="The options used to connect to the server handling the request.",
[docs] def __init__(self, client=None):
client: EvaluatorClient, optional
The client which submitted this request.
if client is not None:
self.connection_options = ConnectionOptions()
self.connection_options.server_address = client.server_address
self.connection_options.server_port = client.server_port
self._client = client
[docs] def results(self, synchronous=False, polling_interval=5):
"""Attempt to retrieve the results of the request from the
If the method is run synchronously it will block the main
thread either all of the requested properties have been
estimated, or an exception is returned.
synchronous: bool
If `True`, this method will block the main thread until
the server either returns a result or an error.
polling_interval: float
If running synchronously, this is the time interval (seconds)
between checking if the calculation has finished. This will
be ignored if running asynchronously.
RequestResult, optional
Returns the current results of the request. This may
be `None` if any unexpected exceptions occurred while
retrieving the estimate.
EvaluatorException, optional
The exception raised will trying to retrieve the result
if any.
if (
self._client is None
or self._client.server_address != self._client.server_address
or self._client.server_port != self._client.server_port
self._client = EvaluatorClient(self.connection_options)
return self._client.retrieve_results(self.id, synchronous, polling_interval)
def __str__(self):
return f"Request id={self.id}"
def __repr__(self):
return f"<{str(self)}>"
[docs]class RequestOptions(AttributeClass):
"""The options to use when requesting a set of physical
properties be estimated by the server.
calculation_layers = Attribute(
docstring="The calculation layers which may be used to "
"estimate the set of physical properties. The order in which "
"the layers appears in this list determines the order in which "
"the layers will attempt to estimate the data set.",
default_value=["ReweightingLayer", "SimulationLayer"],
calculation_schemas = Attribute(
docstring="The schemas that each calculation layer should "
"use when estimating the set of physical properties. The "
"dictionary should be of the form [property_type][layer_type].",
batch_mode = Attribute(
docstring="The way in which the server should batch together "
"properties to estimate. Properties will only be marked as finished "
"when all properties in a single batch are completed.",
[docs] def add_schema(self, layer_type, property_type, schema):
"""A convenience function for adding a calculation schema
to the schema dictionary.
layer_type: str or type of CalculationLayer
The layer to associate the schema with.
property_type: str or type of PhysicalProperty
The class of property to associate the schema
schema: CalculationSchema
The schema to add.
# Validate the schema.
# Make sure the schema is compatible with the layer.
assert layer_type in registered_calculation_layers
calculation_layer = registered_calculation_layers[layer_type]
assert type(schema) is calculation_layer.required_schema_type()
if isinstance(property_type, type):
property_type = property_type.__name__
if self.calculation_schemas == UNDEFINED:
self.calculation_schemas = {}
if property_type not in self.calculation_schemas:
self.calculation_schemas[property_type] = {}
if layer_type not in self.calculation_schemas[property_type]:
self.calculation_schemas[property_type][layer_type] = {}
self.calculation_schemas[property_type][layer_type] = schema
[docs] def validate(self, attribute_type=None):
super(RequestOptions, self).validate(attribute_type)
assert all(isinstance(x, str) for x in self.calculation_layers)
assert all(x in registered_calculation_layers for x in self.calculation_layers)
if self.calculation_schemas != UNDEFINED:
for property_type in self.calculation_schemas:
assert isinstance(self.calculation_schemas[property_type], dict)
for layer_type in self.calculation_schemas[property_type]:
assert layer_type in self.calculation_layers
calculation_layer = registered_calculation_layers[layer_type]
schema = self.calculation_schemas[property_type][layer_type]
required_type = calculation_layer.required_schema_type()
assert isinstance(schema, required_type)
[docs]class RequestResult(AttributeClass):
"""The current results of an estimation request - these
results may be partial if the server hasn't yet completed
the request.
queued_properties = Attribute(
docstring="The set of properties which have yet to be, or "
"are currently being estimated.",
estimated_properties = Attribute(
docstring="The set of properties which have been successfully estimated.",
unsuccessful_properties = Attribute(
docstring="The set of properties which could not be successfully estimated.",
exceptions = Attribute(
docstring="The set of properties which have yet to be, or "
"are currently being estimated.",
[docs] def validate(self, attribute_type=None):
super(RequestResult, self).validate(attribute_type)
assert all((isinstance(x, EvaluatorException) for x in self.exceptions))
[docs]class EvaluatorClient:
"""The object responsible for connecting to, and submitting
physical property estimation requests to an `EvaluatorServer`.
These examples assume that an `EvaluatorServer` has been set up
and is running (either synchronously or asynchronously). This
server can be connect to be creating an `EvaluatorClient`:
>>> from openff.evaluator.client import EvaluatorClient
>>> property_estimator = EvaluatorClient()
If the `EvaluatorServer` is not running on the local machine, you will
need to specify its address and the port that it is listening on:
>>> from openff.evaluator.client import ConnectionOptions
>>> connection_options = ConnectionOptions(server_address='server_address',
>>> server_port=8000)
>>> property_estimator = EvaluatorClient(connection_options)
To asynchronously submit a request to the running server using the default
estimation options:
>>> # Load in the data set of properties which will be used for comparisons
>>> from openff.evaluator.datasets.thermoml import ThermoMLDataSet
>>> data_set = ThermoMLDataSet.from_doi('10.1016/j.jct.2016.10.001')
>>> # Filter the dataset to only include densities measured between 130-260 K
>>> from openff.units import unit
>>> from openff.evaluator.properties import Density
>>> data_set.filter_by_property_types(Density)
>>> data_set.filter_by_temperature(
>>> min_temperature=130*unit.kelvin,
>>> max_temperature=260*unit.kelvin
>>> )
>>> # Load in the force field parameters
>>> from openff.evaluator.forcefield import SmirnoffForceFieldSource
>>> force_field_source = SmirnoffForceFieldSource.from_path('smirnoff99Frosst-1.1.0.offxml')
>>> # Submit the estimation request to a running server.
>>> request = property_estimator.request_estimate(data_set, force_field_source)
The status of the request can be asynchronously queried by calling
>>> results = request.results()
or the main thread can be blocked until the results are
available by calling
>>> results = request.results(synchronous=True)
How the property set will be estimated can easily be controlled by passing a
`RequestOptions` object to the estimate commands.
The calculations layers which will be used to estimate the properties can be
controlled for example like so:
>>> from openff.evaluator.layers.reweighting import ReweightingLayer
>>> from openff.evaluator.layers.simulation import SimulationLayer
>>> options = RequestOptions(calculation_layers=[
>>> "ReweightingLayer",
>>> "SimulationLayer"
>>> ])
>>> request = property_estimator.request_estimate(data_set, force_field_source, options)
Options for how properties should be estimated can be set on a per property, and per layer
basis by providing a calculation schema to the options object.
>>> from openff.evaluator.properties import DielectricConstant
>>> # Generate a schema to use when estimating densities directly
>>> # from simulations.
>>> density_simulation_schema = Density.default_simulation_schema()
>>> # Generate a schema to use when estimating dielectric constants
>>> # from cached simulation data.
>>> dielectric_reweighting_schema = DielectricConstant.default_reweighting_schema()
>>> options.workflow_options = {
>>> 'Density': {'SimulationLayer': density_simulation_schema},
>>> 'Dielectric': {'SimulationLayer': dielectric_reweighting_schema}
>>> }
>>> property_estimator.request_estimate(
>>> data_set,
>>> force_field_source,
>>> options,
>>> )
The gradients of the observables of interest with respect to a number of chosen
parameters can be requested by passing a `parameter_gradient_keys` parameter.
In the below example, gradients will be calculated with respect to both the
bond length parameter for the [#6:1]-[#8:2] chemical environment, and the bond
angle parameter for the [*:1]-[#8:2]-[*:3] chemical environment:
>>> from openff.evaluator.forcefield import ParameterGradientKey
>>> parameter_gradient_keys = [
>>> ParameterGradientKey('Bonds', '[#6:1]-[#8:2]', 'length')
>>> ParameterGradientKey('Angles', '[*:1]-[#8:2]-[*:3]', 'angle')
>>> ]
>>> property_estimator.request_estimate(
>>> data_set,
>>> force_field_source,
>>> options,
>>> parameter_gradient_keys
>>> )
class _Submission(AttributeClass):
"""The data packet encoding an estimation request which will be sent to
the server.
dataset = Attribute(
docstring="The set of properties to estimate.",
options = Attribute(
docstring="The options to use when estimating the dataset.",
force_field_source = Attribute(
docstring="The force field parameters to estimate the dataset using.",
parameter_gradient_keys = Attribute(
docstring="A list of the parameters that the physical properties "
"should be differentiated with respect to.",
def validate(self, attribute_type=None):
super(EvaluatorClient._Submission, self).validate(attribute_type)
assert all(
isinstance(x, ParameterGradientKey)
for x in self.parameter_gradient_keys
def server_address(self):
"""str: The address of the server that this client is connected to."""
return self._connection_options.server_address
def server_port(self):
"""int: The port of the server that this client is connected to."""
return self._connection_options.server_port
[docs] def __init__(self, connection_options=None):
connection_options: ConnectionOptions, optional
The options used when connecting to the calculation
server. If `None`, default options are used.
if connection_options is None:
connection_options = ConnectionOptions()
if connection_options.server_address is None:
raise ValueError(
"The address of the server which will run"
"these calculations must be given."
self._connection_options = connection_options
[docs] @staticmethod
def default_request_options(data_set, force_field_source):
"""Returns the default `RequestOptions` options used
to estimate a set of properties if `None` are provided.
data_set: PhysicalPropertyDataSet
The data set which would be estimated.
force_field_source: ForceFieldSource
The force field parameters which will be used by the
The default options.
options = RequestOptions()
EvaluatorClient._populate_request_options(options, data_set, force_field_source)
return options
def _default_protocol_replacements(force_field_source):
"""Returns the default set of protocols in a workflow to replace
with different types. This is mainly to handle replacing the base
force field assignment protocol with one specific to the force field
force_field_source: ForceFieldSource
The force field parameters which will be used by the
dict of str and str
A map between the type of protocol to replace, and the type of
protocol to use in its place.
replacements = {}
if isinstance(force_field_source, SmirnoffForceFieldSource):
replacements["BaseBuildSystem"] = "BuildSmirnoffSystem"
elif isinstance(force_field_source, LigParGenForceFieldSource):
replacements["BaseBuildSystem"] = "BuildLigParGenSystem"
elif isinstance(force_field_source, TLeapForceFieldSource):
replacements["BaseBuildSystem"] = "BuildTLeapSystem"
elif isinstance(force_field_source, FoyerForceFieldSource):
replacements["BaseBuildSystem"] = "BuildFoyerSystem"
return replacements
def _populate_request_options(options, data_set, force_field_source):
"""Populates any missing attributes of a `RequestOptions`
object with default values registered via the plug-in
options: RequestOptions
The object to populate with defaults.
data_set: PhysicalPropertyDataSet
The data set to be estimated using the options.
force_field_source: ForceFieldSource
The force field parameters which will be used by the
# Retrieve the types of properties in the data set.
property_types = data_set.property_types
if options.calculation_schemas == UNDEFINED:
options.calculation_schemas = defaultdict(dict)
properties_without_schemas = set(property_types)
for property_type in options.calculation_schemas:
if property_type not in properties_without_schemas:
# Assign default calculation schemas in the cases where the user
# hasn't provided one.
for calculation_layer in options.calculation_layers:
for property_type in property_types:
# Check if the user has already provided a schema.
existing_schema = options.calculation_schemas.get(
property_type, {}
).get(calculation_layer, None)
if existing_schema is not None:
# Check if this layer has any registered schemas.
if calculation_layer not in registered_calculation_schemas:
default_layer_schemas = registered_calculation_schemas[
# Check if this property type has any registered schemas for
# the given calculation layer.
if property_type not in default_layer_schemas:
# noinspection PyTypeChecker
default_schema = default_layer_schemas[property_type]
if callable(default_schema):
default_schema = default_schema()
# Mark this property as having at least one registered
# calculation schema.
if property_type in properties_without_schemas:
if property_type not in options.calculation_schemas:
options.calculation_schemas[property_type] = {}
] = default_schema
# Make sure all property types have at least one registered
# calculation schema.
if len(properties_without_schemas) >= 1:
type_string = ", ".join(properties_without_schemas)
raise ValueError(
f"No calculation schema could be found for "
f"the {type_string} properties."
# Perform any protocol type replacements
replacement_types = EvaluatorClient._default_protocol_replacements(
for calculation_layer in options.calculation_layers:
for property_type in property_types:
# Check if the user has already provided a schema.
if (
property_type not in options.calculation_schemas
or calculation_layer
not in options.calculation_schemas[property_type]
schema = options.calculation_schemas[property_type][calculation_layer]
if not isinstance(schema, WorkflowCalculationSchema):
workflow_schema = schema.workflow_schema
[docs] def request_estimate(
"""Submits a request for the `EvaluatorServer` to attempt to estimate
the data set of physical properties using the specified force field
parameters according to the provided options.
property_set : PhysicalPropertyDataSet
The set of properties to estimate.
force_field_source : ForceFieldSource or openff.toolkit.typing.engines.smirnoff.ForceField
The force field parameters to estimate the properties using.
options : RequestOptions, optional
A set of estimator options. If `None` default options
will be used (see `default_request_options`).
parameter_gradient_keys: list of ParameterGradientKey, optional
A list of the parameters that the physical properties should
be differentiated with respect to.
An object which will provide access to the
results of this request.
EvaluatorException, optional
Any exceptions raised while attempting the submit the request.
from openff.toolkit.typing.engines import smirnoff
if property_set is None or force_field_source is None:
raise ValueError(
"Both a data set and force field source must be "
"present to compute physical properties."
if parameter_gradient_keys is None:
parameter_gradient_keys = []
# Handle the conversion of a SMIRNOFF force field object
# for backwards compatibility.
if isinstance(force_field_source, smirnoff.ForceField):
force_field_source = SmirnoffForceFieldSource.from_object(
# Fill in any missing options with default values
if options is None:
options = self.default_request_options(property_set, force_field_source)
options = copy.deepcopy(options)
self._populate_request_options(options, property_set, force_field_source)
# Make sure the options are valid.
# Build the submission object.
submission = EvaluatorClient._Submission()
submission.dataset = property_set
submission.force_field_source = force_field_source
submission.options = options
submission.parameter_gradient_keys = parameter_gradient_keys
# Ensure the submission is valid.
# Send the submission to the server.
request_id, error = self._send_calculations_to_server(submission)
# Build the object which represents this request.
request_object = None
if error is None:
request_object = Request(self)
request_object.id = request_id
return request_object, error
[docs] def retrieve_results(self, request_id, synchronous=False, polling_interval=5):
"""Retrieves the current results of a request from the server.
request_id: str
The server assigned id of the request.
synchronous: bool
If true, this method will block the main thread until the server
either returns a result or an error.
polling_interval: float
If running synchronously, this is the time interval (seconds)
between checking if the request has completed.
RequestResult, optional
Returns the current results of the request. This may
be `None` if any unexpected exceptions occurred while
retrieving the estimate.
EvaluatorException, optional
The exception raised will trying to retrieve the result,
if any.
# If running asynchronously, just return whatever the server
# sends back.
if synchronous is False:
return self._send_query_to_server(request_id)
assert polling_interval >= 0
response = None
error = None
should_run = True
while should_run:
if polling_interval > 0:
response, error = self._send_query_to_server(request_id)
if (
isinstance(response, RequestResult)
and len(response.queued_properties) > 0
logger.info(f"The server has completed request {request_id}.")
should_run = False
return response, error
def _send_calculations_to_server(self, submission):
"""Attempts to connect to the calculation server, and
submit the requested calculations.
submission: _Submission
The jobs to submit.
str, optional:
The id which the server has assigned the submitted calculations.
This can be used to query the server for when the calculation
has completed.
Returns None if the calculation could not be submitted.
EvaluatorException, optional
Any exceptions raised while attempting the submit the request.
# Attempt to establish a connection to the server.
connection_settings = (
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
request_id = None
# Encode the submission json into an encoded
# packet ready to submit to the server.
message_type = pack_int(EvaluatorMessageTypes.Submission)
encoded_json = submission.json().encode()
length = pack_int(len(encoded_json))
connection.sendall(message_type + length + encoded_json)
# Wait for confirmation that the server has received
# the jobs.
header = recvall(connection, 4)
length = unpack_int(header)[0]
# Decode the response from the server. If everything
# went well, this should be the id of the submitted
# calculations.
encoded_json = recvall(connection, length)
request_id, error = json.loads(encoded_json.decode(), cls=TypedJSONDecoder)
except Exception as e:
trace = traceback.format_exception(None, e, e.__traceback__)
error = EvaluatorException(message=trace)
if connection is not None:
# Return the ids of the submitted jobs.
return request_id, error
def _send_query_to_server(self, request_id):
"""Attempts to connect to the calculation server, and
submit the requested calculations.
request_id: str
The id of the job to query.
str, optional:
The status of the submitted job.
Returns None if the calculation has not yet completed.
server_response = None
# Attempt to establish a connection to the server.
connection_settings = (
connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Encode the request id into the message.
message_type = pack_int(EvaluatorMessageTypes.Query)
encoded_request_id = request_id.encode()
length = pack_int(len(encoded_request_id))
connection.sendall(message_type + length + encoded_request_id)
# Wait for the server response.
header = recvall(connection, 4)
length = unpack_int(header)[0]
# Decode the response from the server. If everything
# went well, this should be the finished calculation.
if length > 0:
encoded_json = recvall(connection, length)
server_response = encoded_json.decode()
if connection is not None:
response = None
error = None
if server_response is not None:
response, error = json.loads(server_response, cls=TypedJSONDecoder)
return response, error