Source code for propertyestimator.protocols.openmm

A collection of protocols which employs OpenMM to evaluate and propagate the
state of molecular systems.
import copy
import io
import json
import logging
import os
import re

import numpy as np
import pandas as pd
from simtk import openmm
from simtk import unit as simtk_unit
from simtk.openmm import app

from propertyestimator import unit
from propertyestimator.forcefield import ForceFieldSource, SmirnoffForceFieldSource
from propertyestimator.protocols.gradients import BaseGradientPotentials
from propertyestimator.protocols.reweighting import BaseReducedPotentials
from propertyestimator.protocols.simulation import (
from propertyestimator.thermodynamics import Ensemble
from propertyestimator.utils.openmm import (
from propertyestimator.utils.serialization import TypedJSONDecoder, TypedJSONEncoder
from propertyestimator.utils.statistics import ObservableType, StatisticsArray
from propertyestimator.workflow.plugins import workflow_protocol

logger = logging.getLogger(__name__)

[docs]@workflow_protocol() class OpenMMEnergyMinimisation(BaseEnergyMinimisation): """A protocol to minimise the potential energy of a system using OpenMM. """ def _execute(self, directory, available_resources): platform = setup_platform_with_resources(available_resources) input_pdb_file = app.PDBFile(self.input_coordinate_file) with open(self.system_path, "rb") as file: system = openmm.XmlSerializer.deserialize( if not self.enable_pbc: for force_index in range(system.getNumForces()): force = system.getForce(force_index) if not isinstance(force, openmm.NonbondedForce): continue force.setNonbondedMethod( 0 ) # NoCutoff = 0, NonbondedMethod.CutoffNonPeriodic = 1 # TODO: Expose the constraint tolerance integrator = openmm.VerletIntegrator(0.002 * simtk_unit.picoseconds) simulation = app.Simulation( input_pdb_file.topology, system, integrator, platform ) box_vectors = input_pdb_file.topology.getPeriodicBoxVectors() if box_vectors is None: box_vectors = simulation.system.getDefaultPeriodicBoxVectors() simulation.context.setPeriodicBoxVectors(*box_vectors) simulation.context.setPositions(input_pdb_file.positions) simulation.minimizeEnergy( pint_quantity_to_openmm(self.tolerance), self.max_iterations ) positions = simulation.context.getState(getPositions=True).getPositions() self.output_coordinate_file = os.path.join(directory, "minimised.pdb") with open(self.output_coordinate_file, "w+") as minimised_file: app.PDBFile.writeFile(simulation.topology, positions, minimised_file)
[docs]@workflow_protocol() class OpenMMSimulation(BaseSimulation): """Performs a molecular dynamics simulation in a given ensemble using an OpenMM backend. """ class _Checkpoint: """A temporary checkpoint file which keeps track of the parts of the simulation state not stored in the checkpoint state xml file. """ def __init__( self, output_frequency=-1, checkpoint_frequency=-1, steps_per_iteration=-1, current_step_number=0, ): self.output_frequency = output_frequency self.checkpoint_frequency = checkpoint_frequency self.steps_per_iteration = steps_per_iteration self.current_step_number = current_step_number def __getstate__(self): return { "output_frequency": self.output_frequency, "checkpoint_frequency": self.checkpoint_frequency, "steps_per_iteration": self.steps_per_iteration, "current_step_number": self.current_step_number, } def __setstate__(self, state): self.output_frequency = state["output_frequency"] self.checkpoint_frequency = state["checkpoint_frequency"] self.steps_per_iteration = state["steps_per_iteration"] self.current_step_number = state["current_step_number"] class _Simulation: """A fake simulation class to use with the openmm file reporters. """ def __init__(self, integrator, topology, system, current_step): self.integrator = integrator self.topology = topology self.system = system self.currentStep = current_step
[docs] def __init__(self, protocol_id): super().__init__(protocol_id) self._checkpoint_path = None self._state_path = None self._local_trajectory_path = None self._local_statistics_path = None self._context = None self._integrator = None
def _execute(self, directory, available_resources): # We handle most things in OMM units here. temperature = self.thermodynamic_state.temperature openmm_temperature = pint_quantity_to_openmm(temperature) pressure = ( None if self.ensemble == Ensemble.NVT else self.thermodynamic_state.pressure ) openmm_pressure = pint_quantity_to_openmm(pressure) if openmm_temperature is None: raise ValueError( "A temperature must be set to perform a simulation in any ensemble" ) if Ensemble(self.ensemble) == Ensemble.NPT and openmm_pressure is None: raise ValueError("A pressure must be set to perform an NPT simulation") if Ensemble(self.ensemble) == Ensemble.NPT and self.enable_pbc is False: raise ValueError("PBC must be enabled when running in the NPT ensemble.") # Set up the internal file paths self._checkpoint_path = os.path.join(directory, "checkpoint.json") self._state_path = os.path.join(directory, "checkpoint_state.xml") self._local_trajectory_path = os.path.join(directory, "trajectory.dcd") self._local_statistics_path = os.path.join(directory, "openmm_statistics.csv") # Set up the simulation objects. if self._context is None or self._integrator is None: self._context, self._integrator = self._setup_simulation_objects( openmm_temperature, openmm_pressure, available_resources ) # Save a copy of the starting configuration if it doesn't already exist local_input_coordinate_path = os.path.join(directory, "input.pdb") if not os.path.isfile(local_input_coordinate_path): input_pdb_file = app.PDBFile(self.input_coordinate_file) with open(local_input_coordinate_path, "w+") as configuration_file: app.PDBFile.writeFile( input_pdb_file.topology, input_pdb_file.positions, configuration_file, ) # Run the simulation. self._simulate(directory, self._context, self._integrator) # Set the output paths. self.trajectory_file_path = self._local_trajectory_path self.statistics_file_path = os.path.join(directory, "statistics.csv") # Save out the final statistics in the property estimator format self._save_final_statistics(self.statistics_file_path, temperature, pressure) def _setup_simulation_objects(self, temperature, pressure, available_resources): """Initializes the objects needed to perform the simulation. This comprises of a context, and an integrator. Parameters ---------- temperature: simtk.pint.Quantity The temperature to run the simulation at. pressure: simtk.pint.Quantity The pressure to run the simulation at. available_resources: ComputeResources The resources available to run on. Returns ------- simtk.openmm.Context The created openmm context which takes advantage of the available compute resources. openmmtools.integrators.LangevinIntegrator The Langevin integrator which will propogate the simulation. """ import openmmtools from simtk.openmm import XmlSerializer # Create a platform with the correct resources. if not self.allow_gpu_platforms: from propertyestimator.backends import ComputeResources available_resources = ComputeResources( available_resources.number_of_threads ) platform = setup_platform_with_resources( available_resources, self.high_precision ) # Load in the system object from the provided xml file. with open(self.system_path, "r") as file: system = XmlSerializer.deserialize( # Disable the periodic boundary conditions if requested. if not self.enable_pbc: disable_pbc(system) pressure = None # Use the openmmtools ThermodynamicState object to help # set up a system which contains the correct barostat if # one should be present. openmm_state = openmmtools.states.ThermodynamicState( system=system, temperature=temperature, pressure=pressure ) system = openmm_state.get_system(remove_thermostat=True) # Set up the integrator. thermostat_friction = pint_quantity_to_openmm(self.thermostat_friction) timestep = pint_quantity_to_openmm(self.timestep) integrator = openmmtools.integrators.LangevinIntegrator( temperature=temperature, collision_rate=thermostat_friction, timestep=timestep, ) # Create the simulation context. context = openmm.Context(system, integrator, platform) # Initialize the context with the correct positions etc. input_pdb_file = app.PDBFile(self.input_coordinate_file) if self.enable_pbc: # Optionally set up the box vectors. box_vectors = input_pdb_file.topology.getPeriodicBoxVectors() if box_vectors is None: raise ValueError( "The input file must contain box vectors when running with PBC." ) context.setPeriodicBoxVectors(*box_vectors) context.setPositions(input_pdb_file.positions) context.setVelocitiesToTemperature(temperature) return context, integrator def _write_checkpoint_file(self, current_step_number, context): """Writes a simulation checkpoint file to disk. Parameters ---------- current_step_number: int The total number of steps which have been taken so far. context: simtk.openmm.Context The current OpenMM context. """ # Write the current state to disk state = context.getState( getPositions=True, getEnergy=True, getVelocities=True, getForces=True, getParameters=True, enforcePeriodicBox=self.enable_pbc, ) with open(self._state_path, "w") as file: file.write(openmm.XmlSerializer.serialize(state)) checkpoint = self._Checkpoint( self.output_frequency, self.checkpoint_frequency, self.steps_per_iteration, current_step_number, ) with open(self._checkpoint_path, "w") as file: json.dump(checkpoint, file, cls=TypedJSONEncoder) def _truncate_statistics_file(self, number_of_frames): """Truncates the statistics file to the specified number of frames. Parameters ---------- number_of_frames: int The number of frames to truncate to. """ with open(self._local_statistics_path) as file: header_line = file.readline() file_contents = re.sub("#.*\n", "", with io.StringIO(file_contents) as string_object: existing_statistics_array = pd.read_csv( string_object, index_col=False, header=None ) statistics_length = len(existing_statistics_array) if statistics_length < number_of_frames: raise ValueError( f"The saved number of statistics frames ({statistics_length}) " f"is less than expected ({number_of_frames})." ) elif statistics_length == number_of_frames: return truncated_statistics_array = existing_statistics_array[0:number_of_frames] with open(self._local_statistics_path, "w") as file: file.write(f"{header_line}") truncated_statistics_array.to_csv(file, index=False, header=False) def _truncate_trajectory_file(self, number_of_frames): """Truncates the trajectory file to the specified number of frames. Parameters ---------- number_of_frames: int The number of frames to truncate to. """ import mdtraj from mdtraj.formats.dcd import DCDTrajectoryFile from mdtraj.utils import in_units_of # Load in the required topology object. topology = mdtraj.load_topology(self.input_coordinate_file) # Parse the internal mdtraj distance unit. While private access is # undesirable, this is never publicly defined and I believe this # route to be preferable over hard coding this unit here. # noinspection PyProtectedMember base_distance_unit = mdtraj.Trajectory._distance_unit # Get an accurate measurement of the length of the trajectory # without reading it into memory. trajectory_length = 0 for chunk in mdtraj.iterload(self._local_trajectory_path, top=topology): trajectory_length += len(chunk) # Make sure there is at least the expected number of frames. if trajectory_length < number_of_frames: raise ValueError( f"The saved number of trajectory frames ({trajectory_length}) " f"is less than expected ({number_of_frames})." ) elif trajectory_length == number_of_frames: return # Truncate the trajectory by streaming one frame of the trajectory at # a time. temporary_trajectory_path = f"{self._local_trajectory_path}.tmp" with DCDTrajectoryFile(self._local_trajectory_path, "r") as input_file: with DCDTrajectoryFile(temporary_trajectory_path, "w") as output_file: for frame_index in range(0, number_of_frames): frame = input_file.read_as_traj(topology, n_frames=1, stride=1) output_file.write( xyz=in_units_of(, base_distance_unit, output_file.distance_unit ), cell_lengths=in_units_of( frame.unitcell_lengths, base_distance_unit, output_file.distance_unit, ), cell_angles=frame.unitcell_angles[0], ) os.replace(temporary_trajectory_path, self._local_trajectory_path) # Do a sanity check to make sure the trajectory was successfully truncated. new_trajectory_length = 0 for chunk in mdtraj.iterload( self._local_trajectory_path, top=self.input_coordinate_file ): new_trajectory_length += len(chunk) if new_trajectory_length != number_of_frames: raise ValueError("The trajectory was incorrectly truncated.") def _resume_from_checkpoint(self, context): """Resumes the simulation from a checkpoint file. Parameters ---------- context: simtk.openmm.Context The current OpenMM context. Returns ------- int The current step number. """ current_step_number = 0 # Check whether the checkpoint files actually exists. if not os.path.isfile(self._checkpoint_path) or not os.path.isfile( self._state_path ):"No checkpoint files were found.") return current_step_number if not os.path.isfile(self._local_statistics_path) or not os.path.isfile( self._local_trajectory_path ): raise ValueError( "Checkpoint files were correctly found, but the trajectory " "or statistics files seem to be missing. This should not happen." )"Restoring the system state from checkpoint files.") # If they do, load the current state from disk. with open(self._state_path, "r") as file: current_state = openmm.XmlSerializer.deserialize( with open(self._checkpoint_path, "r") as file: checkpoint = json.load(file, cls=TypedJSONDecoder) if ( self.output_frequency != checkpoint.output_frequency or self.checkpoint_frequency != checkpoint.checkpoint_frequency or self.steps_per_iteration != checkpoint.steps_per_iteration ): raise ValueError( "Neither the output frequency, the checkpoint " "frequency, nor the steps per iteration can " "currently be changed during the course of the " "simulation. Only the number of iterations is " "allowed to change." ) # Make sure this simulation hasn't already finished. total_expected_number_of_steps = ( self.total_number_of_iterations * self.steps_per_iteration ) if checkpoint.current_step_number == total_expected_number_of_steps: return checkpoint.current_step_number context.setState(current_state) # Make sure that the number of frames in the trajectory / # statistics file correspond to the recorded number of steps. # This is to handle possible cases where only some of the files # have been written from the current step (i.e only the trajectory may # have been written to before this protocol gets unexpectedly killed. expected_number_of_frames = int( checkpoint.current_step_number / self.output_frequency ) # Handle the truncation of the statistics file. self._truncate_statistics_file(expected_number_of_frames) # Handle the truncation of the trajectory file. self._truncate_trajectory_file(expected_number_of_frames)"System state restored from checkpoint files.") return checkpoint.current_step_number def _save_final_statistics(self, path, temperature, pressure): """Converts the openmm statistic csv file into a propertyestimator StatisticsArray csv file, making sure to fill in any missing entries. Parameters ---------- path: str The path to save the statistics to. temperature: pint.Quantity The temperature that the simulation is being run at. pressure: pint.Quantity The pressure that the simulation is being run at. """ statistics = StatisticsArray.from_openmm_csv( self._local_statistics_path, pressure ) reduced_potentials = ( statistics[ObservableType.PotentialEnergy] / unit.avogadro_constant ) if pressure is not None: pv_terms = pressure * statistics[ObservableType.Volume] reduced_potentials += pv_terms beta = 1.0 / (unit.boltzmann_constant * temperature) statistics[ObservableType.ReducedPotential] = (beta * reduced_potentials).to( unit.dimensionless ) statistics.to_pandas_csv(path) def _simulate(self, directory, context, integrator): """Performs the simulation using a given context and integrator. Parameters ---------- directory: str The directory the trajectory is being run in. context: simtk.openmm.Context The OpenMM context to run with. integrator: simtk.openmm.Integrator The integrator to evolve the simulation with. """ # Define how many steps should be taken. total_number_of_steps = ( self.total_number_of_iterations * self.steps_per_iteration ) # Try to load the current state from any available checkpoint information current_step = self._resume_from_checkpoint(context) if current_step == total_number_of_steps: return # Build the reporters which we will use to report the state # of the simulation. append_trajectory = os.path.isfile(self._local_trajectory_path) dcd_reporter = app.DCDReporter( self._local_trajectory_path, 0, append_trajectory ) statistics_file = open(self._local_statistics_path, "a+") statistics_reporter = app.StateDataReporter( statistics_file, 0, step=True, potentialEnergy=True, kineticEnergy=True, totalEnergy=True, temperature=True, volume=True, density=True, ) # Create the object which will transfer simulation output to the # reporters. topology = app.PDBFile(self.input_coordinate_file).topology with open(self.system_path, "r") as file: system = openmm.XmlSerializer.deserialize( simulation = self._Simulation(integrator, topology, system, current_step) # Perform the simulation. checkpoint_counter = 0 while current_step < total_number_of_steps: steps_to_take = min( self.output_frequency, total_number_of_steps - current_step ) integrator.step(steps_to_take) current_step += steps_to_take state = context.getState( getPositions=True, getEnergy=True, getVelocities=False, getForces=False, getParameters=False, enforcePeriodicBox=self.enable_pbc, ) simulation.currentStep = current_step # Write out the current state using the reporters., state), state) if checkpoint_counter >= self.checkpoint_frequency: # Save to the checkpoint file if needed. self._write_checkpoint_file(current_step, context) checkpoint_counter = 0 checkpoint_counter += 1 # Save out the final positions. self._write_checkpoint_file(current_step, context) final_state = context.getState(getPositions=True) positions = final_state.getPositions() topology.setPeriodicBoxVectors(final_state.getPeriodicBoxVectors()) self.output_coordinate_file = os.path.join(directory, "output.pdb") with open(self.output_coordinate_file, "w+") as configuration_file: app.PDBFile.writeFile(topology, positions, configuration_file)
[docs]@workflow_protocol() class OpenMMReducedPotentials(BaseReducedPotentials): """Calculates the reduced potential for a given set of configurations using OpenMM. """ def _execute(self, directory, available_resources): import openmmtools import mdtraj trajectory = mdtraj.load_dcd( self.trajectory_file_path, self.coordinate_file_path ) with open(self.system_path, "r") as file: system = openmm.XmlSerializer.deserialize( temperature = pint_quantity_to_openmm(self.thermodynamic_state.temperature) pressure = pint_quantity_to_openmm(self.thermodynamic_state.pressure) if self.enable_pbc: system.setDefaultPeriodicBoxVectors(*trajectory.openmm_boxes(0)) else: pressure = None openmm_state = openmmtools.states.ThermodynamicState( system=system, temperature=temperature, pressure=pressure ) integrator = openmmtools.integrators.VelocityVerletIntegrator( 0.01 * simtk_unit.femtoseconds ) # Setup the requested platform: platform = setup_platform_with_resources( available_resources, self.high_precision ) openmm_system = openmm_state.get_system(True, True) if not self.enable_pbc: disable_pbc(openmm_system) openmm_context = openmm.Context(openmm_system, integrator, platform) potential_energies = np.zeros(trajectory.n_frames) reduced_potentials = np.zeros(trajectory.n_frames) for frame_index in range(trajectory.n_frames): if self.enable_pbc: box_vectors = trajectory.openmm_boxes(frame_index) openmm_context.setPeriodicBoxVectors(*box_vectors) positions =[frame_index] openmm_context.setPositions(positions) potential_energy = openmm_context.getState( getEnergy=True ).getPotentialEnergy() potential_energies[frame_index] = potential_energy.value_in_unit( simtk_unit.kilojoule_per_mole ) reduced_potentials[frame_index] = openmm_state.reduced_potential( openmm_context ) kinetic_energies = StatisticsArray.from_pandas_csv(self.kinetic_energies_path)[ ObservableType.KineticEnergy ] statistics_array = StatisticsArray() statistics_array[ObservableType.PotentialEnergy] = ( potential_energies * unit.kilojoule / unit.mole ) statistics_array[ObservableType.KineticEnergy] = kinetic_energies statistics_array[ObservableType.ReducedPotential] = ( reduced_potentials * unit.dimensionless ) statistics_array[ObservableType.TotalEnergy] = ( statistics_array[ObservableType.PotentialEnergy] + statistics_array[ObservableType.KineticEnergy] ) statistics_array[ObservableType.Enthalpy] = ( statistics_array[ObservableType.ReducedPotential] * self.thermodynamic_state.inverse_beta + kinetic_energies ) if self.use_internal_energy: statistics_array[ObservableType.ReducedPotential] += ( kinetic_energies * self.thermodynamic_state.beta ) self.statistics_file_path = os.path.join(directory, "statistics.csv") statistics_array.to_pandas_csv(self.statistics_file_path)
[docs]@workflow_protocol() class OpenMMGradientPotentials(BaseGradientPotentials): """A protocol to estimates the the reduced potential of the configurations of a trajectory using reverse and forward perturbed simulation parameters for use with estimating reweighted gradients using the central difference method. """ def _build_reduced_system(self, original_force_field, topology, scale_amount=None): """Produces an OpenMM system containing only forces for the specified parameter, optionally perturbed by the amount specified by `scale_amount`. Parameters ---------- original_force_field: openforcefield.typing.engines.smirnoff.ForceField The force field to create the system from (and optionally perturb). topology: openforcefield.topology.Topology The topology of the system to apply the force field to. scale_amount: float, optional The optional amount to perturb the parameter by. Returns ------- simtk.openmm.System The created system. simtk.pint.Quantity The new value of the perturbed parameter. """ # As this method deals mainly with the toolkit, we stick to # simtk units here. from openforcefield.typing.engines.smirnoff import ForceField parameter_tag = self.parameter_key.tag parameter_smirks = self.parameter_key.smirks parameter_attribute = self.parameter_key.attribute original_handler = original_force_field.get_parameter_handler(parameter_tag) original_parameter = original_handler.parameters[parameter_smirks] if self.use_subset_of_force_field: force_field = ForceField() handler = copy.deepcopy( original_force_field.get_parameter_handler(parameter_tag) ) force_field.register_parameter_handler(handler) else: force_field = copy.deepcopy(original_force_field) handler = force_field.get_parameter_handler(parameter_tag) parameter_index = None value_list = None if hasattr(original_parameter, parameter_attribute): parameter_value = getattr(original_parameter, parameter_attribute) else: attribute_split = re.split(r"(\d+)", parameter_attribute) assert len(parameter_attribute) == 2 assert hasattr(original_parameter, attribute_split[0]) parameter_attribute = attribute_split[0] parameter_index = int(attribute_split[1]) - 1 value_list = getattr(original_parameter, parameter_attribute) parameter_value = value_list[parameter_index] if scale_amount is not None: existing_parameter = handler.parameters[parameter_smirks] if np.isclose(parameter_value.value_in_unit(parameter_value.unit), 0.0): # Careful thought needs to be given to this. Consider cases such as # epsilon or sigma where negative values are not allowed. parameter_value = ( scale_amount if scale_amount > 0.0 else 0.0 ) * parameter_value.unit else: parameter_value *= 1.0 + scale_amount if value_list is None: setattr(existing_parameter, parameter_attribute, parameter_value) else: value_list[parameter_index] = parameter_value setattr(existing_parameter, parameter_attribute, value_list) system = force_field.create_openmm_system(topology) if not self.enable_pbc: disable_pbc(system) return system, parameter_value def _evaluate_reduced_potential( self, system, trajectory, file_path, compute_resources, subset_energy_corrections=None, ): """Computes the reduced potential of each frame in a trajectory using the provided system. Parameters ---------- system: simtk.openmm.System The system which encodes the interaction forces for the specified parameter. trajectory: mdtraj.Trajectory A trajectory of configurations to evaluate. file_path: str The path to save the reduced potentials to. compute_resources: ComputeResources The compute resources available to execute on. subset_energy_corrections: pint.Quantity, optional A pint.Quantity wrapped numpy.ndarray which contains a set of energies to add to the re-evaluated potential energies. This is mainly used to correct the potential energies evaluated using a subset of the force field back to energies as if evaluated using the full thing. Returns --------- StatisticsArray The array containing the reduced potentials. """ integrator = openmm.VerletIntegrator(0.1 * simtk_unit.femtoseconds) platform = setup_platform_with_resources(compute_resources, True) openmm_context = openmm.Context(system, integrator, platform) potentials = np.zeros(trajectory.n_frames, dtype=np.float64) reduced_potentials = np.zeros(trajectory.n_frames, dtype=np.float64) temperature = pint_quantity_to_openmm(self.thermodynamic_state.temperature) beta = 1.0 / (simtk_unit.BOLTZMANN_CONSTANT_kB * temperature) pressure = pint_quantity_to_openmm(self.thermodynamic_state.pressure) if subset_energy_corrections is None: subset_energy_corrections = ( np.zeros(trajectory.n_frames, dtype=np.float64) * simtk_unit.kilojoules_per_mole ) else: subset_energy_corrections = pint_quantity_to_openmm( subset_energy_corrections ) for frame_index in range(trajectory.n_frames): positions =[frame_index] box_vectors = trajectory.openmm_boxes(frame_index) if self.enable_pbc: openmm_context.setPeriodicBoxVectors(*box_vectors) openmm_context.setPositions(positions) state = openmm_context.getState(getEnergy=True) potential_energy = ( state.getPotentialEnergy() + subset_energy_corrections[frame_index] ) unreduced_potential = potential_energy / simtk_unit.AVOGADRO_CONSTANT_NA if pressure is not None and self.enable_pbc: unreduced_potential += pressure * state.getPeriodicBoxVolume() potentials[frame_index] = potential_energy.value_in_unit( simtk_unit.kilojoule_per_mole ) reduced_potentials[frame_index] = unreduced_potential * beta potentials *= unit.kilojoule / unit.mole reduced_potentials *= unit.dimensionless statistics_array = StatisticsArray() statistics_array[ObservableType.ReducedPotential] = reduced_potentials statistics_array[ObservableType.PotentialEnergy] = potentials statistics_array.to_pandas_csv(file_path) return statistics_array def _execute(self, directory, available_resources): import mdtraj from openforcefield.topology import Molecule, Topology with open(self.force_field_path) as file: force_field_source = ForceFieldSource.parse_json( if not isinstance(force_field_source, SmirnoffForceFieldSource): raise ValueError( "Only SMIRNOFF force fields are supported by this protocol.", ) # Load in the inputs force_field = force_field_source.to_force_field() trajectory = mdtraj.load_dcd( self.trajectory_file_path, self.coordinate_file_path ) unique_molecules = [] for component in self.substance.components: molecule = Molecule.from_smiles(smiles=component.smiles) unique_molecules.append(molecule) pdb_file = app.PDBFile(self.coordinate_file_path) topology = Topology.from_openmm( pdb_file.topology, unique_molecules=unique_molecules ) # Compute the difference between the energies using the reduced force field, # and the full force field. energy_corrections = None if self.use_subset_of_force_field: target_system, _ = self._build_reduced_system(force_field, topology) subset_potentials_path = os.path.join(directory, f"subset.csv") subset_potentials = self._evaluate_reduced_potential( target_system, trajectory, subset_potentials_path, available_resources ) full_statistics = StatisticsArray.from_pandas_csv(self.statistics_path) energy_corrections = ( full_statistics[ObservableType.PotentialEnergy] - subset_potentials[ObservableType.PotentialEnergy] ) # Build the slightly perturbed system. reverse_system, reverse_parameter_value = self._build_reduced_system( force_field, topology, -self.perturbation_scale ) forward_system, forward_parameter_value = self._build_reduced_system( force_field, topology, self.perturbation_scale ) self.reverse_parameter_value = openmm_quantity_to_pint(reverse_parameter_value) self.forward_parameter_value = openmm_quantity_to_pint(forward_parameter_value) # Calculate the reduced potentials. self.reverse_potentials_path = os.path.join(directory, "reverse.csv") self.forward_potentials_path = os.path.join(directory, "forward.csv") self._evaluate_reduced_potential( reverse_system, trajectory, self.reverse_potentials_path, available_resources, energy_corrections, ) self._evaluate_reduced_potential( forward_system, trajectory, self.forward_potentials_path, available_resources, energy_corrections, )