"""
Defines the base API for defining new property estimator estimation layers.
"""
import json
import logging
import traceback
from os import path
from propertyestimator.storage.dataclasses import StoredDataCollection
from propertyestimator.utils.exceptions import PropertyEstimatorException
from propertyestimator.utils.serialization import TypedJSONDecoder
available_layers = {}
[docs]def register_calculation_layer():
"""A decorator which registers a class as being a calculation layer
which may be used in property calculations.
See Also
--------
TODO: add documentation for plugin support
"""
def decorator(cls):
if cls.__name__ in available_layers:
raise ValueError('The {} layer is already registered.'.format(cls.__name__))
available_layers[cls.__name__] = cls
return cls
return decorator
def return_args(*args, **kwargs):
return args
class CalculationLayerResult:
"""The output returned from attempting to calculate a property on
a PropertyCalculationLayer.
Attributes
----------
property_id: str
The unique id of the original physical property that this
calculation layer attempts to estimate.
calculated_property: PhysicalProperty, optional
The property which was estimated by this layer. The will
be `None` if the layer could not estimate the property.
exception: PropertyEstimatorException, optional
The exception which was raised when estimating the property
of interest, if any.
data_to_store: list of tuple of str and str
A list of pairs of a path to a JSON serialized `BaseStoredData`
object, and the path to the corresponding data directory.
"""
def __init__(self):
"""Constructs a new CalculationLayerResult object.
"""
self.property_id = None
self.calculated_property = None
self.exception = None
self.data_to_store = []
def __getstate__(self):
return {
'property_id': self.property_id,
'calculated_property': self.calculated_property,
'exception': self.exception,
'data_to_store': self.data_to_store
}
def __setstate__(self, state):
self.property_id = state['property_id']
self.calculated_property = state['calculated_property']
self.exception = state['exception']
self.data_to_store = state['data_to_store']
[docs]class PropertyCalculationLayer:
"""An abstract representation of a calculation layer whose goal is
to estimate a set of physical properties using a single approach,
such as a layer which employs direct simulations to estimate properties,
or one which reweights cached simulation data to the same end.
Notes
-----
Calculation layers must inherit from this class, and must override the
`schedule_calculation` method.
See Also
--------
TODO: Link to a general page outlining what calculation layers are
and how they are used.
"""
@staticmethod
def _await_results(calculation_backend, storage_backend, layer_directory, server_request,
callback, submitted_futures, synchronous=False):
"""A helper method to handle passing the results of this layer back to
the main thread.
Parameters
----------
calculation_backend: PropertyEstimatorBackend
The backend to the submit the calculations to.
storage_backend: PropertyEstimatorStorage
The backend used to store / retrieve data from previous calculations.
layer_directory: str
The local directory in which to store all local, temporary calculation data from this layer.
server_request: PropertyEstimatorServer.ServerEstimationRequest
The request object which spawned the awaited results.
callback: function
The function to call when the backend returns the results (or an error).
submitted_futures: list(dask.distributed.Future)
A list of the futures returned by the backed when submitting the calculation.
synchronous: bool
If true, this function will block until the calculation has completed.
"""
callback_future = calculation_backend.submit_task(return_args,
*submitted_futures,
key=f'return_{server_request.id}')
def callback_wrapper(results_future):
PropertyCalculationLayer._process_results(results_future, server_request, storage_backend, callback)
if synchronous:
callback_wrapper(callback_future)
else:
callback_future.add_done_callback(callback_wrapper)
@staticmethod
def _process_results(results_future, server_request, storage_backend, callback):
"""Processes the results of a calculation layer, updates the server request,
then passes it back to the callback ready for propagation to the next layer
in the stack.
Parameters
----------
results_future: distributed.Future
The future object which will hold the results.
server_request: PropertyEstimatorServer.ServerEstimationRequest
The request object which spawned the awaited results.
storage_backend: PropertyEstimatorStorage
The backend used to store / retrieve data from previous calculations.
callback: function
The function to call when the backend returns the results (or an error).
"""
# Wrap everything in a try catch to make sure the whole calculation backend /
# server doesn't go down when an unexpected exception occurs.
try:
results = list(results_future.result())
results_future.release()
for returned_output in results:
if returned_output is None:
# Indicates the layer could not calculate this
# particular property.
continue
if not isinstance(returned_output, CalculationLayerResult):
# Make sure we are actually dealing with the object we expect.
raise ValueError('The output of the calculation was not '
'a CalculationLayerResult as expected.')
if returned_output.exception is not None:
# If an exception was raised, make sure to add it to the list.
server_request.exceptions.append(returned_output.exception)
logging.info(f'An exception was raised: '
f'{returned_output.exception.directory} - '
f'{returned_output.exception.message}')
else:
# Make sure to store any important calculation data if no exceptions
# were thrown.
if (returned_output.data_to_store is not None and
returned_output.calculated_property is not None):
for data_tuple in returned_output.data_to_store:
data_object_path, data_directory_path = data_tuple
# Make sure the data directory / file to store actually exists
if not path.isdir(data_directory_path) or not path.isfile(data_object_path):
logging.info(f'Invalid data directory ({data_directory_path}) / '
f'file ({data_object_path})')
continue
# Attach any extra metadata which is missing.
with open(data_object_path, 'r') as file:
data_object = json.load(file, cls=TypedJSONDecoder)
if data_object.force_field_id is None:
data_object.force_field_id = server_request.force_field_id
if isinstance(data_object, StoredDataCollection):
for inner_data_object in data_object.data.values():
if inner_data_object.force_field_id is None:
inner_data_object.force_field_id = server_request.force_field_id
storage_backend.store_simulation_data(data_object, data_directory_path)
matches = [x for x in server_request.queued_properties if x.id == returned_output.property_id]
if len(matches) > 1:
raise ValueError(f'A property id ({returned_output.property_id}) conflict occurred.')
elif len(matches) == 0:
logging.info('A calculation layer returned results for a property not in the '
'queue. This sometimes and expectedly occurs when using queue based '
'calculation backends, but should be investigated.')
continue
if returned_output.calculated_property is None:
if returned_output.exception is None:
logging.info('A calculation layer did not return an estimated property nor did it '
'raise an Exception. This sometimes and expectedly occurs when using '
'queue based calculation backends, but should be investigated.')
continue
if returned_output.exception is not None:
continue
for match in matches:
server_request.queued_properties.remove(match)
substance_id = returned_output.calculated_property.substance.identifier
if substance_id not in server_request.estimated_properties:
server_request.estimated_properties[substance_id] = []
server_request.estimated_properties[substance_id].append(returned_output.calculated_property)
except Exception as e:
logging.info(f'Error processing layer results for request {server_request.id}')
formatted_exception = traceback.format_exception(None, e, e.__traceback__)
exception = PropertyEstimatorException(message='An unhandled internal exception '
'occurred: {}'.format(formatted_exception))
server_request.exceptions.append(exception)
callback(server_request)
[docs] @staticmethod
def schedule_calculation(calculation_backend, storage_backend, layer_directory,
data_model, callback, synchronous=False):
"""Submit the proposed calculation to the backend of choice.
Parameters
----------
calculation_backend: PropertyEstimatorBackend
The backend to the submit the calculations to.
storage_backend: PropertyEstimatorStorage
The backend used to store / retrieve data from previous calculations.
layer_directory: str
The local directory in which to store all local, temporary calculation data from this layer.
data_model: PropertyEstimatorServer.ServerEstimationRequest
The data model encoding the proposed calculation.
callback: function
The function to call when the backend returns the results (or an error).
synchronous: bool
If true, this function will block until the calculation has completed.
This is mainly intended for debugging purposes.
"""
raise NotImplementedError()