Source code for propertyestimator.protocols.simulation

"""
A collection of protocols for running molecular simulations.
"""
import io
import json
import logging
import os
import re
import traceback

import pandas as pd
from simtk import openmm
from simtk import unit as simtk_unit
from simtk.openmm import app

from propertyestimator import unit
from propertyestimator.thermodynamics import Ensemble, ThermodynamicState
from propertyestimator.utils.exceptions import PropertyEstimatorException
from propertyestimator.utils.openmm import (
    disable_pbc,
    pint_quantity_to_openmm,
    setup_platform_with_resources,
)
from propertyestimator.utils.serialization import TypedJSONDecoder, TypedJSONEncoder
from propertyestimator.utils.statistics import ObservableType, StatisticsArray
from propertyestimator.workflow.decorators import (
    UNDEFINED,
    InequalityMergeBehaviour,
    protocol_input,
    protocol_output,
)
from propertyestimator.workflow.plugins import register_calculation_protocol
from propertyestimator.workflow.protocols import BaseProtocol


[docs]@register_calculation_protocol() class RunEnergyMinimisation(BaseProtocol): """A protocol to minimise the potential energy of a system. """ input_coordinate_file = protocol_input( docstring="The coordinates to minimise.", type_hint=str, default_value=UNDEFINED ) system_path = protocol_input( docstring="The path to the XML system object which defines the forces present " "in the system.", type_hint=str, default_value=UNDEFINED, ) tolerance = protocol_input( docstring="The energy tolerance to which the system should be minimized.", type_hint=unit.Quantity, default_value=10 * unit.kilojoules / unit.mole, ) max_iterations = protocol_input( docstring="The maximum number of iterations to perform. If this is 0, " "minimization is continued until the results converge without regard to " "how many iterations it takes.", type_hint=int, default_value=0, ) enable_pbc = protocol_input( docstring="If true, periodic boundary conditions will be enabled.", type_hint=bool, default_value=True, ) output_coordinate_file = protocol_output( docstring="The file path to the minimised coordinates.", type_hint=str )
[docs] def execute(self, directory, available_resources): logging.info("Minimising energy: " + self.id) platform = setup_platform_with_resources(available_resources) input_pdb_file = app.PDBFile(self.input_coordinate_file) with open(self.system_path, "rb") as file: system = openmm.XmlSerializer.deserialize(file.read().decode()) if not self.enable_pbc: for force_index in range(system.getNumForces()): force = system.getForce(force_index) if not isinstance(force, openmm.NonbondedForce): continue force.setNonbondedMethod( 0 ) # NoCutoff = 0, NonbondedMethod.CutoffNonPeriodic = 1 # TODO: Expose the constraint tolerance integrator = openmm.VerletIntegrator(0.002 * simtk_unit.picoseconds) simulation = app.Simulation( input_pdb_file.topology, system, integrator, platform ) box_vectors = input_pdb_file.topology.getPeriodicBoxVectors() if box_vectors is None: box_vectors = simulation.system.getDefaultPeriodicBoxVectors() simulation.context.setPeriodicBoxVectors(*box_vectors) simulation.context.setPositions(input_pdb_file.positions) simulation.minimizeEnergy( pint_quantity_to_openmm(self.tolerance), self.max_iterations ) positions = simulation.context.getState(getPositions=True).getPositions() self.output_coordinate_file = os.path.join(directory, "minimised.pdb") with open(self.output_coordinate_file, "w+") as minimised_file: app.PDBFile.writeFile(simulation.topology, positions, minimised_file) logging.info("Energy minimised: " + self.id) return self._get_output_dictionary()
[docs]@register_calculation_protocol() class RunOpenMMSimulation(BaseProtocol): """Performs a molecular dynamics simulation in a given ensemble using an OpenMM backend. """ class _Checkpoint: """A temporary checkpoint file which keeps track of the parts of the simulation state not stored in the checkpoint state xml file. """ def __init__( self, output_frequency=-1, checkpoint_frequency=-1, steps_per_iteration=-1, current_step_number=0, ): self.output_frequency = output_frequency self.checkpoint_frequency = checkpoint_frequency self.steps_per_iteration = steps_per_iteration self.current_step_number = current_step_number def __getstate__(self): return { "output_frequency": self.output_frequency, "checkpoint_frequency": self.checkpoint_frequency, "steps_per_iteration": self.steps_per_iteration, "current_step_number": self.current_step_number, } def __setstate__(self, state): self.output_frequency = state["output_frequency"] self.checkpoint_frequency = state["checkpoint_frequency"] self.steps_per_iteration = state["steps_per_iteration"] self.current_step_number = state["current_step_number"] class _Simulation: """A fake simulation class to use with the openmm file reporters. """ def __init__(self, integrator, topology, system, current_step): self.integrator = integrator self.topology = topology self.system = system self.currentStep = current_step steps_per_iteration = protocol_input( docstring="The number of steps to propogate the system by at " "each iteration. The total number of steps performed " "by this protocol will be `total_number_of_iterations * " "steps_per_iteration`.", type_hint=int, merge_behavior=InequalityMergeBehaviour.LargestValue, default_value=1000000, ) total_number_of_iterations = protocol_input( docstring="The number of times to propogate the system forward by the " "`steps_per_iteration` number of steps. The total number of " "steps performed by this protocol will be `total_number_of_iterations * " "steps_per_iteration`.", type_hint=int, merge_behavior=InequalityMergeBehaviour.LargestValue, default_value=1, ) output_frequency = protocol_input( docstring="The frequency (in number of steps) with which to write to the " "output statistics and trajectory files.", type_hint=int, merge_behavior=InequalityMergeBehaviour.SmallestValue, default_value=3000, ) checkpoint_frequency = protocol_input( docstring="The frequency (in multiples of `output_frequency`) with which to " "write to a checkpoint file, e.g. if `output_frequency=100` and " "`checkpoint_frequency==2`, a checkpoint file would be saved every " "200 steps.", type_hint=int, merge_behavior=InequalityMergeBehaviour.SmallestValue, optional=True, default_value=10, ) timestep = protocol_input( docstring="The timestep to evolve the system by at each step.", type_hint=unit.Quantity, merge_behavior=InequalityMergeBehaviour.SmallestValue, default_value=2.0 * unit.femtosecond, ) thermodynamic_state = protocol_input( docstring="The thermodynamic conditions to simulate under", type_hint=ThermodynamicState, default_value=UNDEFINED, ) ensemble = protocol_input( docstring="The thermodynamic ensemble to simulate in.", type_hint=Ensemble, default_value=Ensemble.NPT, ) thermostat_friction = protocol_input( docstring="The thermostat friction coefficient.", type_hint=unit.Quantity, merge_behavior=InequalityMergeBehaviour.SmallestValue, default_value=1.0 / unit.picoseconds, ) input_coordinate_file = protocol_input( docstring="The file path to the starting coordinates.", type_hint=str, default_value=UNDEFINED, ) system_path = protocol_input( docstring="A path to the XML system object which defines the forces present " "in the system.", type_hint=str, default_value=UNDEFINED, ) enable_pbc = protocol_input( docstring="If true, periodic boundary conditions will be enabled.", type_hint=bool, default_value=True, ) allow_gpu_platforms = protocol_input( docstring="If true, OpenMM will be allowed to run using a GPU if available, " "otherwise it will be constrained to only using CPUs.", type_hint=bool, default_value=True, ) high_precision = protocol_input( docstring="If true, OpenMM will be run using a platform with high precision " "settings. This will be the Reference platform when only a CPU is " "available, or double precision mode when a GPU is available.", type_hint=bool, default_value=False, ) output_coordinate_file = protocol_output( docstring="The file path to the coordinates of the final system configuration.", type_hint=str, ) trajectory_file_path = protocol_output( docstring="The file path to the trajectory sampled during the simulation.", type_hint=str, ) statistics_file_path = protocol_output( docstring="The file path to the statistics sampled during the simulation.", type_hint=str, )
[docs] def __init__(self, protocol_id): super().__init__(protocol_id) self._checkpoint_path = None self._state_path = None self._local_trajectory_path = None self._local_statistics_path = None self._context = None self._integrator = None
[docs] def execute(self, directory, available_resources): # We handle most things in OMM units here. temperature = self.thermodynamic_state.temperature openmm_temperature = pint_quantity_to_openmm(temperature) pressure = ( None if self.ensemble == Ensemble.NVT else self.thermodynamic_state.pressure ) openmm_pressure = pint_quantity_to_openmm(pressure) if openmm_temperature is None: return PropertyEstimatorException( directory=directory, message="A temperature must be set to perform " "a simulation in any ensemble", ) if Ensemble(self.ensemble) == Ensemble.NPT and openmm_pressure is None: return PropertyEstimatorException( directory=directory, message="A pressure must be set to perform an NPT simulation", ) if Ensemble(self.ensemble) == Ensemble.NPT and self.enable_pbc is False: return PropertyEstimatorException( directory=directory, message="PBC must be enabled when running in the NPT ensemble.", ) logging.info( "Performing a simulation in the " + str(self.ensemble) + " ensemble: " + self.id ) # Set up the internal file paths self._checkpoint_path = os.path.join(directory, "checkpoint.json") self._state_path = os.path.join(directory, "checkpoint_state.xml") self._local_trajectory_path = os.path.join(directory, "trajectory.dcd") self._local_statistics_path = os.path.join(directory, "openmm_statistics.csv") # Set up the simulation objects. if self._context is None or self._integrator is None: self._context, self._integrator = self._setup_simulation_objects( openmm_temperature, openmm_pressure, available_resources ) # Save a copy of the starting configuration if it doesn't already exist local_input_coordinate_path = os.path.join(directory, "input.pdb") if not os.path.isfile(local_input_coordinate_path): input_pdb_file = app.PDBFile(self.input_coordinate_file) with open(local_input_coordinate_path, "w+") as configuration_file: app.PDBFile.writeFile( input_pdb_file.topology, input_pdb_file.positions, configuration_file, ) # Run the simulation. result = self._simulate(directory, self._context, self._integrator) if isinstance(result, PropertyEstimatorException): return result # Set the output paths. self.trajectory_file_path = self._local_trajectory_path self.statistics_file_path = os.path.join(directory, "statistics.csv") # Save out the final statistics in the property estimator format self._save_final_statistics(self.statistics_file_path, temperature, pressure) return self._get_output_dictionary()
def _setup_simulation_objects(self, temperature, pressure, available_resources): """Initializes the objects needed to perform the simulation. This comprises of a context, and an integrator. Parameters ---------- temperature: simtk.unit.Quantity The temperature to run the simulation at. pressure: simtk.unit.Quantity The pressure to run the simulation at. available_resources: ComputeResources The resources available to run on. Returns ------- simtk.openmm.Context The created openmm context which takes advantage of the available compute resources. openmmtools.integrators.LangevinIntegrator The Langevin integrator which will propogate the simulation. """ import openmmtools from simtk.openmm import XmlSerializer # Create a platform with the correct resources. if not self.allow_gpu_platforms: from propertyestimator.backends import ComputeResources available_resources = ComputeResources( available_resources.number_of_threads ) platform = setup_platform_with_resources( available_resources, self.high_precision ) # Load in the system object from the provided xml file. with open(self.system_path, "r") as file: system = XmlSerializer.deserialize(file.read()) # Disable the periodic boundary conditions if requested. if not self.enable_pbc: disable_pbc(system) pressure = None # Use the openmmtools ThermodynamicState object to help # set up a system which contains the correct barostat if # one should be present. openmm_state = openmmtools.states.ThermodynamicState( system=system, temperature=temperature, pressure=pressure ) system = openmm_state.get_system(remove_thermostat=True) # Set up the integrator. thermostat_friction = pint_quantity_to_openmm(self.thermostat_friction) timestep = pint_quantity_to_openmm(self.timestep) integrator = openmmtools.integrators.LangevinIntegrator( temperature=temperature, collision_rate=thermostat_friction, timestep=timestep, ) # Create the simulation context. context = openmm.Context(system, integrator, platform) # Initialize the context with the correct positions etc. input_pdb_file = app.PDBFile(self.input_coordinate_file) if self.enable_pbc: # Optionally set up the box vectors. box_vectors = input_pdb_file.topology.getPeriodicBoxVectors() if box_vectors is None: raise ValueError( "The input file must contain box vectors " "when running with PBC." ) context.setPeriodicBoxVectors(*box_vectors) context.setPositions(input_pdb_file.positions) context.setVelocitiesToTemperature(temperature) return context, integrator def _write_checkpoint_file(self, current_step_number, context): """Writes a simulation checkpoint file to disk. Parameters ---------- current_step_number: int The total number of steps which have been taken so far. context: simtk.openmm.Context The current OpenMM context. """ # Write the current state to disk state = context.getState( getPositions=True, getEnergy=True, getVelocities=True, getForces=True, getParameters=True, enforcePeriodicBox=self.enable_pbc, ) with open(self._state_path, "w") as file: file.write(openmm.XmlSerializer.serialize(state)) checkpoint = self._Checkpoint( self.output_frequency, self.checkpoint_frequency, self.steps_per_iteration, current_step_number, ) with open(self._checkpoint_path, "w") as file: json.dump(checkpoint, file, cls=TypedJSONEncoder) def _truncate_statistics_file(self, number_of_frames): """Truncates the statistics file to the specified number of frames. Parameters ---------- number_of_frames: int The number of frames to truncate to. """ with open(self._local_statistics_path) as file: header_line = file.readline() file_contents = re.sub("#.*\n", "", file.read()) with io.StringIO(file_contents) as string_object: existing_statistics_array = pd.read_csv( string_object, index_col=False, header=None ) statistics_length = len(existing_statistics_array) if statistics_length < number_of_frames: raise ValueError( f"The saved number of statistics frames ({statistics_length}) " f"is less than expected ({number_of_frames})." ) elif statistics_length == number_of_frames: return truncated_statistics_array = existing_statistics_array[0:number_of_frames] with open(self._local_statistics_path, "w") as file: file.write(f"{header_line}") truncated_statistics_array.to_csv(file, index=False, header=False) def _truncate_trajectory_file(self, number_of_frames): """Truncates the trajectory file to the specified number of frames. Parameters ---------- number_of_frames: int The number of frames to truncate to. """ import mdtraj from mdtraj.formats.dcd import DCDTrajectoryFile from mdtraj.utils import in_units_of # Load in the required topology object. topology = mdtraj.load_topology(self.input_coordinate_file) # Parse the internal mdtraj distance unit. While private access is # undesirable, this is never publicly defined and I believe this # route to be preferable over hard coding this unit here. base_distance_unit = mdtraj.Trajectory._distance_unit # Get an accurate measurement of the length of the trajectory # without reading it into memory. trajectory_length = 0 for chunk in mdtraj.iterload(self._local_trajectory_path, top=topology): trajectory_length += len(chunk) # Make sure there is at least the expected number of frames. if trajectory_length < number_of_frames: raise ValueError( f"The saved number of trajectory frames ({trajectory_length}) " f"is less than expected ({number_of_frames})." ) elif trajectory_length == number_of_frames: return # Truncate the trajectory by streaming one frame of the trajectory at # a time. temporary_trajectory_path = f"{self._local_trajectory_path}.tmp" with DCDTrajectoryFile(self._local_trajectory_path, "r") as input_file: with DCDTrajectoryFile(temporary_trajectory_path, "w") as output_file: for frame_index in range(0, number_of_frames): frame = input_file.read_as_traj(topology, n_frames=1, stride=1) output_file.write( xyz=in_units_of( frame.xyz, base_distance_unit, output_file.distance_unit ), cell_lengths=in_units_of( frame.unitcell_lengths, base_distance_unit, output_file.distance_unit, ), cell_angles=frame.unitcell_angles[0], ) os.replace(temporary_trajectory_path, self._local_trajectory_path) # Do a sanity check to make sure the trajectory was successfully truncated. new_trajectory_length = 0 for chunk in mdtraj.iterload( self._local_trajectory_path, top=self.input_coordinate_file ): new_trajectory_length += len(chunk) if new_trajectory_length != number_of_frames: raise ValueError("The trajectory was incorrectly truncated.") def _resume_from_checkpoint(self, context): """Resumes the simulation from a checkpoint file. Parameters ---------- context: simtk.openmm.Context The current OpenMM context. Returns ------- int The current step number. """ current_step_number = 0 # Check whether the checkpoint files actually exists. if not os.path.isfile(self._checkpoint_path) or not os.path.isfile( self._state_path ): logging.info("No checkpoint files were found.") return current_step_number if not os.path.isfile(self._local_statistics_path) or not os.path.isfile( self._local_trajectory_path ): raise ValueError( "Checkpoint files were correctly found, but the trajectory " "or statistics files seem to be missing. This should not happen." ) logging.info("Restoring the system state from checkpoint files.") # If they do, load the current state from disk. with open(self._state_path, "r") as file: current_state = openmm.XmlSerializer.deserialize(file.read()) with open(self._checkpoint_path, "r") as file: checkpoint = json.load(file, cls=TypedJSONDecoder) if ( self.output_frequency != checkpoint.output_frequency or self.checkpoint_frequency != checkpoint.checkpoint_frequency or self.steps_per_iteration != checkpoint.steps_per_iteration ): raise ValueError( "Neither the output frequency, the checkpoint " "frequency, nor the steps per iteration can " "currently be changed during the course of the " "simulation. Only the number of iterations is " "allowed to change." ) # Make sure this simulation hasn't already finished. total_expected_number_of_steps = ( self.total_number_of_iterations * self.steps_per_iteration ) if checkpoint.current_step_number == total_expected_number_of_steps: return checkpoint.current_step_number context.setState(current_state) # Make sure that the number of frames in the trajectory / # statistics file correspond to the recorded number of steps. # This is to handle possible cases where only some of the files # have been written from the current step (i.e only the trajectory may # have been written to before this protocol gets unexpectedly killed. expected_number_of_frames = int( checkpoint.current_step_number / self.output_frequency ) # Handle the truncation of the statistics file. self._truncate_statistics_file(expected_number_of_frames) # Handle the truncation of the trajectory file. self._truncate_trajectory_file(expected_number_of_frames) logging.info("System state restored from checkpoint files.") return checkpoint.current_step_number def _save_final_statistics(self, path, temperature, pressure): """Converts the openmm statistic csv file into a propertyestimator StatisticsArray csv file, making sure to fill in any missing entries. Parameters ---------- path: str The path to save the statistics to. temperature: unit.Quantity The temperature that the simulation is being run at. pressure: unit.Quantity The pressure that the simulation is being run at. """ statistics = StatisticsArray.from_openmm_csv( self._local_statistics_path, pressure ) reduced_potentials = ( statistics[ObservableType.PotentialEnergy] / unit.avogadro_number ) if pressure is not None: pv_terms = pressure * statistics[ObservableType.Volume] reduced_potentials += pv_terms beta = 1.0 / (unit.boltzmann_constant * temperature) statistics[ObservableType.ReducedPotential] = (beta * reduced_potentials).to( unit.dimensionless ) statistics.to_pandas_csv(path) def _simulate(self, directory, context, integrator): """Performs the simulation using a given context and integrator. Parameters ---------- directory: str The directory the trajectory is being run in. context: simtk.openmm.Context The OpenMM context to run with. integrator: simtk.openmm.Integrator The integrator to evolve the simulation with. """ # Define how many steps should be taken. total_number_of_steps = ( self.total_number_of_iterations * self.steps_per_iteration ) # Try to load the current state from any available checkpoint information current_step = self._resume_from_checkpoint(context) if current_step == total_number_of_steps: return None # Build the reporters which we will use to report the state # of the simulation. append_trajectory = os.path.isfile(self._local_trajectory_path) dcd_reporter = app.DCDReporter( self._local_trajectory_path, 0, append_trajectory ) statistics_file = open(self._local_statistics_path, "a+") statistics_reporter = app.StateDataReporter( statistics_file, 0, step=True, potentialEnergy=True, kineticEnergy=True, totalEnergy=True, temperature=True, volume=True, density=True, ) # Create the object which will transfer simulation output to the # reporters. topology = app.PDBFile(self.input_coordinate_file).topology with open(self.system_path, "r") as file: system = openmm.XmlSerializer.deserialize(file.read()) simulation = self._Simulation(integrator, topology, system, current_step) # Perform the simulation. checkpoint_counter = 0 try: while current_step < total_number_of_steps: steps_to_take = min( self.output_frequency, total_number_of_steps - current_step ) integrator.step(steps_to_take) current_step += steps_to_take state = context.getState( getPositions=True, getEnergy=True, getVelocities=False, getForces=False, getParameters=False, enforcePeriodicBox=self.enable_pbc, ) simulation.currentStep = current_step # Write out the current state using the reporters. dcd_reporter.report(simulation, state) statistics_reporter.report(simulation, state) if checkpoint_counter >= self.checkpoint_frequency: # Save to the checkpoint file if needed. self._write_checkpoint_file(current_step, context) checkpoint_counter = 0 checkpoint_counter += 1 except Exception as e: formatted_exception = ( f"{traceback.format_exception(None, e, e.__traceback__)}" ) return PropertyEstimatorException( directory=directory, message=f"The simulation failed unexpectedly: " f"{formatted_exception}", ) # Save out the final positions. self._write_checkpoint_file(current_step, context) final_state = context.getState(getPositions=True) positions = final_state.getPositions() topology.setPeriodicBoxVectors(final_state.getPeriodicBoxVectors()) self.output_coordinate_file = os.path.join(directory, "output.pdb") with open(self.output_coordinate_file, "w+") as configuration_file: app.PDBFile.writeFile(topology, positions, configuration_file) logging.info( f"Simulation performed in the {str(self.ensemble)} ensemble: {self._id}" ) return None