Source code for propertyestimator.server

"""
Property calculator 'server' side API.
"""
import copy
import json
import logging
import uuid
from os import path, makedirs

from tornado.ioloop import IOLoop
from tornado.iostream import StreamClosedError
from tornado.tcpserver import TCPServer

from propertyestimator.client import PropertyEstimatorSubmission, PropertyEstimatorResult, PropertyEstimatorOptions
from propertyestimator.layers import available_layers
from propertyestimator.utils.exceptions import PropertyEstimatorException
from propertyestimator.utils.serialization import TypedBaseModel
from propertyestimator.utils.tcp import PropertyEstimatorMessageTypes, pack_int, unpack_int


[docs]class PropertyEstimatorServer(TCPServer): """The object responsible for coordinating all properties estimations to to be ran using the property estimator, in addition to deciding at which fidelity a property will be calculated. It acts as a server, which receives submitted jobs from clients launched via the property estimator. Warnings -------- This class is still heavily under development and is subject to rapid changes. Notes ----- Methods to handle the TCP messages are based on the StackOverflow response from A. Jesse Jiryu Davis: https://stackoverflow.com/a/40257248 Examples -------- Setting up a general server instance using a dask LocalCluster backend: >>> # Create the backend which will be responsible for distributing the calculations >>> from propertyestimator.backends import DaskLocalCluster, ComputeResources >>> calculation_backend = DaskLocalCluster(1) >>> >>> # Calculate the backend which will be responsible for storing and retrieving >>> # the data from previous calculations >>> from propertyestimator.storage import LocalFileStorage >>> storage_backend = LocalFileStorage() >>> >>> # Create the server to which all estimation requests will be submitted >>> from propertyestimator.server import PropertyEstimatorServer >>> property_server = PropertyEstimatorServer(calculation_backend, storage_backend) >>> >>> # Instruct the server to listen for incoming requests >>> property_server.start_listening_loop() """
[docs] class ServerEstimationRequest(TypedBaseModel): """Represents a request for the server to estimate a set of properties. Such requests are expected to only estimate properties for a single system (e.g. fixed components in a fixed ratio) """ def __init__(self, estimation_id='', queued_properties=None, options=None, force_field_id=None, parameter_gradient_keys=None): """Constructs a new ServerEstimationRequest object. Parameters ---------- estimation_id: str A unique id assigned to this estimation request. queued_properties: list of PhysicalProperty, optional A list of physical properties waiting to be estimated. options: PropertyEstimatorOptions, optional The options used to estimate the properties. force_field_id: str The unique server side id of the force field parameters used to estimate the properties. parameter_gradient_keys: list of ParameterGradientKey A list of references to all of the parameters which all observables should be differentiated with respect to. """ self.id = estimation_id self.queued_properties = queued_properties or [] self.estimated_properties = {} self.unsuccessful_properties = {} self.exceptions = [] self.options = options self.force_field_id = force_field_id self.parameter_gradient_keys = parameter_gradient_keys def __getstate__(self): return { 'id': self.id, 'queued_properties': self.queued_properties, 'estimated_properties': self.estimated_properties, 'unsuccessful_properties': self.unsuccessful_properties, 'exceptions': self.exceptions, 'options': self.options, 'force_field_id': self.force_field_id, 'parameter_gradient_keys': self.parameter_gradient_keys } def __setstate__(self, state): self.id = state['id'] self.queued_properties = state['queued_properties'] self.estimated_properties = state['estimated_properties'] self.unsuccessful_properties = state['unsuccessful_properties'] self.exceptions = state['exceptions'] self.options = state['options'] self.force_field_id = state['force_field_id'] self.parameter_gradient_keys = state['parameter_gradient_keys']
[docs] def __init__(self, calculation_backend, storage_backend, port=8000, working_directory='working-data'): """Constructs a new PropertyEstimatorServer object. Parameters ---------- calculation_backend: PropertyEstimatorBackend The backend to use for executing calculations. storage_backend: PropertyEstimatorStorage The backend to use for storing information from any calculations. port: int The port on which to listen for incoming client requests. working_directory: str The local directory in which to store all local, temporary calculation data. """ assert calculation_backend is not None and storage_backend is not None self._calculation_backend = calculation_backend self._storage_backend = storage_backend self._port = port self._working_directory = working_directory if not path.isdir(self._working_directory): makedirs(self._working_directory) self._queued_calculations = {} self._finished_calculations = {} # Each client request id (i.e an id relating to a client requesting # that an entire data set of properties is estimated) is matched to # a set of server set request ids. # # The main difference is that on the server, a request to estimate # an entire data set is split into multiple requests to estimate # properties per substance. self._server_request_ids_per_client_id = {} super().__init__() self.bind(self._port) self.start(1) calculation_backend.start()
async def _handle_job_submission(self, stream, address, message_length): """An asynchronous routine for handling the receiving and processing of job submissions from a client. Parameters ---------- stream: IOStream An IO stream used to pass messages between the server and client. address: str The address from which the request came. message_length: int The length of the message being received. """ logging.info('Received estimation request from {}'.format(address)) # Read the incoming request from the server. The first four bytes # of the response should be the length of the message being sent. # Decode the client submission json. encoded_json = await stream.read_bytes(message_length) json_model = encoded_json.decode() # TODO: Add exception handling so the server can gracefully reject bad json. client_data_model = PropertyEstimatorSubmission.parse_json(json_model) client_request_id = str(uuid.uuid4()) while client_request_id in self._server_request_ids_per_client_id: client_request_id = str(uuid.uuid4()) self._server_request_ids_per_client_id[client_request_id] = [] # Pass the ids of the submitted requests back to the # client. encoded_job_ids = json.dumps(client_request_id).encode() length = pack_int(len(encoded_job_ids)) await stream.write(length + encoded_job_ids) logging.info('Request id sent to the client ({}): {}'.format(address, client_request_id)) server_requests, request_ids_to_launch = self._prepare_server_requests(client_data_model, client_request_id) # Keep track of which server request ids belong to which client # request id. for request_id in request_ids_to_launch: self._schedule_server_request(server_requests[request_id]) async def _handle_job_query(self, stream, message_length): """An asynchronous routine for handling the receiving and processing of job queries from a client Parameters ---------- stream: IOStream An IO stream used to pass messages between the server and client. message_length: int The length of the message being received. """ encoded_request_id = await stream.read_bytes(message_length) client_request_id = encoded_request_id.decode() response = None if client_request_id not in self._server_request_ids_per_client_id: response = PropertyEstimatorException(directory='', message=f'The {client_request_id} request id was not found ' f'on the server.') else: response = self._query_client_request_status(client_request_id) encoded_response = response.json().encode() length = pack_int(len(encoded_response)) await stream.write(length + encoded_response)
[docs] async def handle_stream(self, stream, address): """A routine to handle incoming requests from a property estimator TCP client. Notes ----- This method is based on the StackOverflow response from A. Jesse Jiryu Davis: https://stackoverflow.com/a/40257248 Parameters ---------- stream: IOStream An IO stream used to pass messages between the server and client. address: str The address from which the request came. """ # logging.info("Incoming connection from {}".format(address)) try: while True: # Receive an introductory message with the message type. packed_message_type = await stream.read_bytes(4) message_type_int = unpack_int(packed_message_type)[0] packed_message_length = await stream.read_bytes(4) message_length = unpack_int(packed_message_length)[0] # logging.info('Introductory packet recieved: {} {}'.format(message_type_int, message_length)) message_type = None try: message_type = PropertyEstimatorMessageTypes(message_type_int) # logging.info('Message type: {}'.format(message_type)) except ValueError as e: logging.info('Bad message type recieved: {}'.format(e)) # Discard the unrecognised message. if message_length > 0: await stream.read_bytes(message_length) continue if message_type is PropertyEstimatorMessageTypes.Submission: await self._handle_job_submission(stream, address, message_length) elif message_type is PropertyEstimatorMessageTypes.Query: await self._handle_job_query(stream, message_length) except StreamClosedError: # Handle client disconnections gracefully. # logging.info("Lost connection to {}:{} : {}.".format(address, self._port, e)) pass
def _find_server_estimation_request(self, request): """Checks whether the server is currently, or has previously completed a request to estimate a set of properties for a particular substance using the same force field parameters and estimation options. Parameters ---------- request: PropertyEstimatorServer.ServerEstimationRequest The request to check for. Returns ------- str, optional The id of the existing request if one exists, otherwise None. """ cached_request_id = request.id for existing_id in self._queued_calculations: request.id = existing_id if request.json() != self._queued_calculations[existing_id].json(): continue request.id = cached_request_id return existing_id for existing_id in self._finished_calculations: request.id = existing_id if request.json() != self._finished_calculations[existing_id].json(): continue request.id = cached_request_id return existing_id request.id = cached_request_id return None def _prepare_server_requests(self, client_data_model, client_request_id): """Turns a client estimation submission request into a form more useful to the server, namely a list of properties to estimate separated by system composition. Parameters ---------- client_data_model: PropertyEstimatorSubmission The client data model. client_request_id: str The id that was assigned to the client request. Returns ------- dict of str and PropertyEstimatorServer.ServerEstimationRequest A list of the requests to be calculated by the server. list of str The ids of the requests which haven't already been launched by the server. """ force_field = client_data_model.force_field force_field_id = self._storage_backend.has_force_field(force_field) if force_field_id is None: force_field_id = self._storage_backend.store_force_field(force_field) server_requests = {} # Split the full list of properties into lists partitioned by # substance. properties_by_substance = {} for physical_property in client_data_model.properties: if physical_property.substance.identifier not in properties_by_substance: properties_by_substance[physical_property.substance.identifier] = [] properties_by_substance[physical_property.substance.identifier].append(physical_property) for substance_identifier in properties_by_substance: calculation_id = str(uuid.uuid4()) # Make sure we don't somehow generate the same uuid # twice (although this is very unlikely to ever happen). while (calculation_id in self._queued_calculations or calculation_id in self._finished_calculations): calculation_id = str(uuid.uuid4()) properties_to_estimate = properties_by_substance[substance_identifier] options_copy = PropertyEstimatorOptions.parse_json(client_data_model.options.json()) parameter_gradient_keys = copy.deepcopy(client_data_model.parameter_gradient_keys) request = self.ServerEstimationRequest(estimation_id=calculation_id, queued_properties=properties_to_estimate, options=options_copy, force_field_id=force_field_id, parameter_gradient_keys=parameter_gradient_keys) server_requests[calculation_id] = request request_ids_to_launch = [] # Make sure this request is not already in the queue / has # already been completed, and if not add it to the list of # things to be queued. for server_request_id in server_requests: server_request = server_requests[server_request_id] existing_id = self._find_server_estimation_request(server_request) if existing_id is None: request_ids_to_launch.append(server_request_id) existing_id = server_request_id self._queued_calculations[server_request_id] = server_request self._server_request_ids_per_client_id[client_request_id].append(existing_id) return server_requests, request_ids_to_launch def _query_client_request_status(self, client_request_id): """Queries the current status of a client request by querying the state of the individual server requests it was split into. Parameters ---------- client_request_id: str The id of the client request to query. Returns ------- PropertyEstimatorResult The current results of the client request. """ request_results = PropertyEstimatorResult(result_id=client_request_id) for server_request_id in self._server_request_ids_per_client_id[client_request_id]: server_request = None if server_request_id in self._queued_calculations: server_request = self._queued_calculations[server_request_id] elif server_request_id in self._finished_calculations: server_request = self._finished_calculations[server_request_id] if len(server_request.queued_properties) > 0: return PropertyEstimatorException(message=f'An internal error occurred - the {server_request_id} ' f'was prematurely marked us finished.') else: return PropertyEstimatorException(message=f'An internal error occurred - the {server_request_id} ' f'request was not found on the server.') for physical_property in server_request.queued_properties: substance_id = physical_property.substance.identifier if substance_id not in request_results.queued_properties: request_results.queued_properties[substance_id] = [] request_results.queued_properties[substance_id].append(physical_property) for substance_id in server_request.unsuccessful_properties: physical_property = server_request.unsuccessful_properties[substance_id] if substance_id not in request_results.unsuccessful_properties: request_results.unsuccessful_properties[substance_id] = [] request_results.unsuccessful_properties[substance_id].append(physical_property) for substance_id in server_request.estimated_properties: physical_properties = server_request.estimated_properties[substance_id] if substance_id not in request_results.estimated_properties: request_results.estimated_properties[substance_id] = [] request_results.estimated_properties[substance_id].extend(physical_properties) request_results.exceptions.extend(server_request.exceptions) return request_results def _schedule_server_request(self, server_request): """Schedules the estimation of the requested properties. This method will recursively cascade through all allowed calculation layers or until all properties have been calculated. Parameters ---------- server_request : PropertyEstimatorServer.ServerEstimationRequest The object containing instructions about which calculations should be performed. """ if len(server_request.options.allowed_calculation_layers) == 0 or \ len(server_request.queued_properties) == 0: # Move any remaining properties to the unsuccessful list. for physical_property in server_request.queued_properties: substance_id = physical_property.substance.identifier if substance_id not in server_request.unsuccessful_properties: server_request.unsuccessful_properties[substance_id] = [] server_request.unsuccessful_properties[substance_id].append(physical_property) server_request.queued_properties = [] self._queued_calculations.pop(server_request.id) self._finished_calculations[server_request.id] = server_request logging.info(f'Finished server request {server_request.id}') return current_layer_type = server_request.options.allowed_calculation_layers.pop(0) if current_layer_type not in available_layers: # Kill all remaining properties if we reach an unsupported calculation layer. error_object = PropertyEstimatorException(message=f'The {current_layer_type} layer is not ' f'supported by / available on the server.') server_request.exceptions.append(error_object) server_request.options.allowed_calculation_layers.append(current_layer_type) server_request.queued_properties = [] self._schedule_server_request(server_request) return logging.info(f'Launching server request {server_request.id} using the {current_layer_type} layer') layer_directory = path.join(self._working_directory, current_layer_type, server_request.id) if not path.isdir(layer_directory): makedirs(layer_directory) current_layer = available_layers[current_layer_type] current_layer.schedule_calculation(self._calculation_backend, self._storage_backend, layer_directory, server_request, self._schedule_server_request)
[docs] def start_listening_loop(self): """Starts the main (blocking) server IOLoop which will run until the user kills the process. """ logging.info('Server listening at port {}'.format(self._port)) try: IOLoop.current().start() except KeyboardInterrupt: self.stop()
[docs] def stop(self): """Stops the property calculation server and it's provided backend. """ self._calculation_backend.stop() IOLoop.current().stop()