"""
The simulation reweighting estimation layer.
"""
import abc
import json
import logging
import os
from propertyestimator.layers import (
PropertyCalculationLayer,
register_calculation_layer,
)
from propertyestimator.substances import Substance
from propertyestimator.utils.serialization import TypedJSONEncoder
from propertyestimator.utils.utils import SubhookedABCMeta
from propertyestimator.workflow import Workflow, WorkflowGraph
from propertyestimator.workflow.workflow import IWorkflowProperty
class IReweightable(SubhookedABCMeta):
@property
@abc.abstractmethod
def multi_component_property(self):
"""bool: Returns whether this property is dependant on properties of the
full mixed substance, or whether it is also dependant on the properties
of the individual components also.
"""
pass
@property
@abc.abstractmethod
def required_data_class(self):
"""subclass of BaseStoredData: The data class required to reweight this
property (e.g. `StoredSimulationData`).
"""
pass
[docs]@register_calculation_layer()
class ReweightingLayer(PropertyCalculationLayer):
"""A calculation layer which aims to calculate physical properties by
reweighting the results of previous calculations.
.. warning :: This class is still heavily under development and is subject to
rapid changes.
"""
[docs] @staticmethod
def schedule_calculation(
calculation_backend,
storage_backend,
layer_directory,
data_model,
callback,
synchronous=False,
):
# Make a local copy of the target force field.
target_force_field_source = storage_backend.retrieve_force_field(
data_model.force_field_id
)
target_force_field_path = os.path.join(
layer_directory, data_model.force_field_id
)
with open(target_force_field_path, "w") as file:
file.write(target_force_field_source.json())
stored_data_paths = ReweightingLayer._retrieve_stored_data(
data_model.queued_properties, storage_backend, layer_directory
)
workflow_graph = ReweightingLayer._build_workflow_graph(
layer_directory,
data_model.queued_properties,
target_force_field_path,
stored_data_paths,
data_model.parameter_gradient_keys,
data_model.options,
)
reweighting_futures = workflow_graph.submit(calculation_backend)
PropertyCalculationLayer._await_results(
calculation_backend,
storage_backend,
layer_directory,
data_model,
callback,
reweighting_futures,
synchronous,
)
@staticmethod
def _retrieve_stored_data(physical_properties, storage_backend, layer_directory):
"""Extract all of the stored data from the backend which may be
used in reweighting
Parameters
----------
physical_properties: list of PhysicalProperty
The physical properties to attempt to estimate.
storage_backend: PropertyEstimatorStorage
The storage backend to retrieve the data from.
layer_directory: str
The directory in which to store the retrieved data.
Returns
-------
dict of str and dict of str and tuple(str, str, str)
A dictionary partitioned by substance identifiers and the type,
of data class, whose values are a tuple of a path to a stored
simulation data object, it's ancillary data directory, and its
corresponding force field path.
"""
data_paths = {}
for physical_property in physical_properties:
if not isinstance(physical_property, IReweightable):
# Only properties which implement the IReweightable
# interface can be reweighted
continue
existing_data = storage_backend.retrieve_simulation_data(
physical_property.substance,
physical_property.multi_component_property,
physical_property.required_data_class,
)
if len(existing_data) == 0:
continue
# Take data from the storage backend and save it in the working directory.
for substance_id in existing_data:
# Register the substance id with the return dictionary
if substance_id not in data_paths:
data_paths[substance_id] = {}
for data_object, data_directory in existing_data[substance_id]:
# Register this objects data class type with the
# return dictionary
if type(data_object) not in data_paths[substance_id]:
data_paths[substance_id][type(data_object)] = []
data_object_path = os.path.join(
layer_directory, f"{os.path.basename(data_directory)}.json"
)
# Save a local copy of the data object file.
if not os.path.isfile(data_object_path):
with open(data_object_path, "w") as file:
json.dump(data_object, file, cls=TypedJSONEncoder)
force_field_path = os.path.join(
layer_directory, data_object.force_field_id
)
path_tuple = (data_object_path, data_directory, force_field_path)
if path_tuple in data_paths[substance_id][type(data_object)]:
continue
# Save a local copy of the force field file if one
# does not already exist.
if not os.path.isfile(force_field_path):
existing_force_field = storage_backend.retrieve_force_field(
data_object.force_field_id
)
with open(force_field_path, "w") as file:
file.write(existing_force_field.json())
data_paths[substance_id][type(data_object)].append(path_tuple)
return data_paths
@staticmethod
def _build_workflow_graph(
working_directory,
properties,
target_force_field_path,
stored_data_paths,
parameter_gradient_keys,
options,
):
"""Construct a workflow graph, containing all of the workflows which should
be followed to estimate a set of properties by reweighting.
Parameters
----------
working_directory: str
The local directory in which to store all local,
temporary calculation data from this graph.
properties : list of PhysicalProperty
The properties to attempt to compute.
target_force_field_path : str
The path to the target force field parameters to use in the workflow.
stored_data_paths: dict of str and tuple(str, str)
A dictionary partitioned by substance identifiers, whose values
are a tuple of a path to a stored simulation data object, and
its corresponding force field path.
parameter_gradient_keys: list of ParameterGradientKey
A list of references to all of the parameters which all observables
should be differentiated with respect to.
options: PropertyEstimatorOptions
The options to run the workflows with.
"""
workflow_graph = WorkflowGraph(working_directory)
for property_to_calculate in properties:
if not isinstance(property_to_calculate, IReweightable) or not isinstance(
property_to_calculate, IWorkflowProperty
):
# Only properties which implement the IReweightable and
# IWorkflowProperty interfaces can be reweighted
continue
property_type = type(property_to_calculate).__name__
if property_type not in options.workflow_schemas:
logging.warning(
"The reweighting layer does not support {} "
"workflows.".format(property_type)
)
continue
if ReweightingLayer.__name__ not in options.workflow_schemas[property_type]:
continue
schema = options.workflow_schemas[property_type][ReweightingLayer.__name__]
workflow_options = options.workflow_options[property_type].get(
ReweightingLayer.__name__
)
global_metadata = Workflow.generate_default_metadata(
property_to_calculate,
target_force_field_path,
parameter_gradient_keys,
workflow_options,
)
substance_id = property_to_calculate.substance.identifier
data_class_type = property_to_calculate.required_data_class
if (
substance_id not in stored_data_paths
or data_class_type not in stored_data_paths[substance_id]
):
# We haven't found and cached data which is compatible with this property.
continue
global_metadata["full_system_data"] = stored_data_paths[substance_id][
data_class_type
]
global_metadata["component_data"] = []
if property_to_calculate.multi_component_property:
has_data_for_property = True
for component in property_to_calculate.substance.components:
temporary_substance = Substance()
temporary_substance.add_component(
component, amount=Substance.MoleFraction()
)
if (
temporary_substance.identifier not in stored_data_paths
or data_class_type
not in stored_data_paths[temporary_substance.identifier]
):
has_data_for_property = False
break
global_metadata["component_data"].append(
stored_data_paths[temporary_substance.identifier][
data_class_type
]
)
if not has_data_for_property:
continue
workflow = Workflow(property_to_calculate, global_metadata)
workflow.schema = schema
from propertyestimator.properties import CalculationSource
workflow.physical_property.source = CalculationSource(
fidelity=ReweightingLayer.__name__, provenance={}
)
workflow_graph.add_workflow(workflow)
return workflow_graph