Source code for propertyestimator.layers.reweighting

"""
The simulation reweighting estimation layer.
"""
import abc
import json
import logging
import os

from propertyestimator.layers import register_calculation_layer, PropertyCalculationLayer
from propertyestimator.substances import Substance
from propertyestimator.utils.serialization import TypedJSONEncoder
from propertyestimator.utils.utils import SubhookedABCMeta
from propertyestimator.workflow import WorkflowGraph, Workflow
from propertyestimator.workflow.workflow import IWorkflowProperty


class IReweightable(SubhookedABCMeta):

    @property
    @abc.abstractmethod
    def multi_component_property(self):
        """bool: Returns whether this property is dependant on properties of the
        full mixed substance, or whether it is also dependant on the properties
        of the individual components also.
        """
        pass

    @property
    @abc.abstractmethod
    def required_data_class(self):
        """subclass of BaseStoredData: The data class required to reweight this
        property (e.g. `StoredSimulationData`).
        """
        pass


[docs]@register_calculation_layer() class ReweightingLayer(PropertyCalculationLayer): """A calculation layer which aims to calculate physical properties by reweighting the results of previous calculations. .. warning :: This class is still heavily under development and is subject to rapid changes. """
[docs] @staticmethod def schedule_calculation(calculation_backend, storage_backend, layer_directory, data_model, callback, synchronous=False): # Make a local copy of the target force field. target_force_field = storage_backend.retrieve_force_field(data_model.force_field_id) target_force_field_path = os.path.join(layer_directory, data_model.force_field_id) target_force_field.to_file(target_force_field_path, io_format='XML', discard_cosmetic_attributes=False) stored_data_paths = ReweightingLayer._retrieve_stored_data(data_model.queued_properties, storage_backend, layer_directory) workflow_graph = ReweightingLayer._build_workflow_graph(layer_directory, data_model.queued_properties, target_force_field_path, stored_data_paths, data_model.parameter_gradient_keys, data_model.options) reweighting_futures = workflow_graph.submit(calculation_backend) PropertyCalculationLayer._await_results(calculation_backend, storage_backend, layer_directory, data_model, callback, reweighting_futures, synchronous)
@staticmethod def _retrieve_stored_data(physical_properties, storage_backend, layer_directory): """Extract all of the stored data from the backend which may be used in reweighting Parameters ---------- physical_properties: list of PhysicalProperty The physical properties to attempt to estimate. storage_backend: PropertyEstimatorStorage The storage backend to retrieve the data from. layer_directory: str The directory in which to store the retrieved data. Returns ------- dict of str and dict of str and tuple(str, str, str) A dictionary partitioned by substance identifiers and the type, of data class, whose values are a tuple of a path to a stored simulation data object, it's ancillary data directory, and its corresponding force field path. """ data_paths = {} for physical_property in physical_properties: if not isinstance(physical_property, IReweightable): # Only properties which implement the IReweightable # interface can be reweighted continue existing_data = storage_backend.retrieve_simulation_data(physical_property.substance, physical_property.multi_component_property, physical_property.required_data_class) if len(existing_data) == 0: continue # Take data from the storage backend and save it in the working directory. for substance_id in existing_data: # Register the substance id with the return dictionary if substance_id not in data_paths: data_paths[substance_id] = {} for data_object, data_directory in existing_data[substance_id]: # Register this objects data class type with the # return dictionary if type(data_object) not in data_paths[substance_id]: data_paths[substance_id][type(data_object)] = [] data_object_path = os.path.join(layer_directory, f'{os.path.basename(data_directory)}.json') # Save a local copy of the data object file. if not os.path.isfile(data_object_path): with open(data_object_path, 'w') as file: json.dump(data_object, file, cls=TypedJSONEncoder) force_field_path = os.path.join(layer_directory, data_object.force_field_id) path_tuple = (data_object_path, data_directory, force_field_path) if path_tuple in data_paths[substance_id][type(data_object)]: continue # Save a local copy of the force field file if one # does not already exist. if not os.path.isfile(force_field_path): existing_force_field = storage_backend.retrieve_force_field(data_object.force_field_id) existing_force_field.to_file(force_field_path, io_format='XML', discard_cosmetic_attributes=False) data_paths[substance_id][type(data_object)].append(path_tuple) return data_paths @staticmethod def _build_workflow_graph(working_directory, properties, target_force_field_path, stored_data_paths, parameter_gradient_keys, options): """Construct a workflow graph, containing all of the workflows which should be followed to estimate a set of properties by reweighting. Parameters ---------- working_directory: str The local directory in which to store all local, temporary calculation data from this graph. properties : list of PhysicalProperty The properties to attempt to compute. target_force_field_path : str The path to the target force field parameters to use in the workflow. stored_data_paths: dict of str and tuple(str, str) A dictionary partitioned by substance identifiers, whose values are a tuple of a path to a stored simulation data object, and its corresponding force field path. parameter_gradient_keys: list of ParameterGradientKey A list of references to all of the parameters which all observables should be differentiated with respect to. options: PropertyEstimatorOptions The options to run the workflows with. """ workflow_graph = WorkflowGraph(working_directory) for property_to_calculate in properties: if (not isinstance(property_to_calculate, IReweightable) or not isinstance(property_to_calculate, IWorkflowProperty)): # Only properties which implement the IReweightable and # IWorkflowProperty interfaces can be reweighted continue property_type = type(property_to_calculate).__name__ if property_type not in options.workflow_schemas: logging.warning('The reweighting layer does not support {} ' 'workflows.'.format(property_type)) continue if ReweightingLayer.__name__ not in options.workflow_schemas[property_type]: continue schema = options.workflow_schemas[property_type][ReweightingLayer.__name__] workflow_options = options.workflow_options[property_type].get(ReweightingLayer.__name__) global_metadata = Workflow.generate_default_metadata(property_to_calculate, target_force_field_path, parameter_gradient_keys, workflow_options) substance_id = property_to_calculate.substance.identifier data_class_type = property_to_calculate.required_data_class if (substance_id not in stored_data_paths or data_class_type not in stored_data_paths[substance_id]): # We haven't found and cached data which is compatible with this property. continue global_metadata['full_system_data'] = stored_data_paths[substance_id][data_class_type] global_metadata['component_data'] = [] if property_to_calculate.multi_component_property: has_data_for_property = True for component in property_to_calculate.substance.components: temporary_substance = Substance() temporary_substance.add_component(component, amount=Substance.MoleFraction()) if (temporary_substance.identifier not in stored_data_paths or data_class_type not in stored_data_paths[temporary_substance.identifier]): has_data_for_property = False break global_metadata['component_data'].append( stored_data_paths[temporary_substance.identifier][data_class_type]) if not has_data_for_property: continue workflow = Workflow(property_to_calculate, global_metadata) workflow.schema = schema from propertyestimator.properties import CalculationSource workflow.physical_property.source = CalculationSource(fidelity=ReweightingLayer.__name__, provenance={}) workflow_graph.add_workflow(workflow) return workflow_graph