Source code for propertyestimator.protocols.gradients

"""
A collection of protocols for reweighting cached simulation data.
"""
import copy
import logging
import re
import typing
from os import path

import numpy as np
from simtk import openmm
from simtk.openmm import app

from propertyestimator import unit
from propertyestimator.forcefield import ForceFieldSource, SmirnoffForceFieldSource
from propertyestimator.properties.properties import (
    ParameterGradient,
    ParameterGradientKey,
)
from propertyestimator.substances import Substance
from propertyestimator.thermodynamics import ThermodynamicState
from propertyestimator.utils.exceptions import PropertyEstimatorException
from propertyestimator.utils.openmm import (
    disable_pbc,
    openmm_quantity_to_pint,
    pint_quantity_to_openmm,
    setup_platform_with_resources,
)
from propertyestimator.utils.quantities import EstimatedQuantity
from propertyestimator.utils.statistics import ObservableType, StatisticsArray
from propertyestimator.workflow.decorators import (
    UNDEFINED,
    protocol_input,
    protocol_output,
)
from propertyestimator.workflow.plugins import register_calculation_protocol
from propertyestimator.workflow.protocols import BaseProtocol


[docs]@register_calculation_protocol() class GradientReducedPotentials(BaseProtocol): """A protocol to estimates the the reduced potential of the configurations of a trajectory using reverse and forward perturbed simulation parameters for use with estimating reweighted gradients using the central difference method. """ force_field_path = protocol_input( docstring="The path to the force field which contains the parameters to " "differentiate the observable with respect to. When reweighting " "observables, this should be the `target` force field.", type_hint=str, default_value=UNDEFINED, ) statistics_path = protocol_input( docstring="The path to a statistics array containing potentials " "evaluated at each frame of the trajectory using the input " "`force_field_path` and at the input `thermodynamic_state`.", type_hint=str, default_value=UNDEFINED, ) thermodynamic_state = protocol_input( docstring="The thermodynamic state to estimate the gradients at. When " "reweighting observables, this should be the `target` state.", type_hint=ThermodynamicState, default_value=UNDEFINED, ) substance = protocol_input( docstring="The substance which describes the composition of the system.", type_hint=Substance, default_value=UNDEFINED, ) coordinate_file_path = protocol_input( docstring="A path to a PDB coordinate file which describes the topology of " "the system.", type_hint=str, default_value=UNDEFINED, ) trajectory_file_path = protocol_input( docstring="A path to the trajectory of configurations", type_hint=str, default_value=UNDEFINED, ) enable_pbc = protocol_input( docstring="If true, periodic boundary conditions will be enabled when " "re-evaluating the reduced potentials.", type_hint=bool, default_value=True, ) parameter_key = protocol_input( docstring="The key of the parameter to differentiate with respect to.", type_hint=ParameterGradientKey, default_value=UNDEFINED, ) perturbation_scale = protocol_input( docstring="The amount to perturb the parameter by, such that " "p_new = p_old * (1 +/- `perturbation_scale`)", type_hint=float, default_value=1.0e-4, ) use_subset_of_force_field = protocol_input( docstring="If true, the reduced potentials will be estimated using " "an OpenMM system which only contains the parameters of " "interest", type_hint=bool, default_value=True, ) effective_sample_indices = protocol_input( docstring="This a placeholder input which is not currently implemented.", type_hint=list, default_value=UNDEFINED, optional=True, ) reverse_potentials_path = protocol_output( docstring="A file path to the energies evaluated using the parameters" "perturbed in the reverse direction.", type_hint=str, ) forward_potentials_path = protocol_output( docstring="A file path to the energies evaluated using the parameters" "perturbed in the forward direction.", type_hint=str, ) reverse_parameter_value = protocol_output( docstring="The value of the parameter perturbed in the reverse " "direction.", type_hint=unit.Quantity, ) forward_parameter_value = protocol_output( docstring="The value of the parameter perturbed in the forward " "direction.", type_hint=unit.Quantity, ) def _build_reduced_system(self, original_force_field, topology, scale_amount=None): """Produces an OpenMM system containing only forces for the specified parameter, optionally perturbed by the amount specified by `scale_amount`. Parameters ---------- original_force_field: openforcefield.typing.engines.smirnoff.ForceField The force field to create the system from (and optionally perturb). topology: openforcefield.topology.Topology The topology of the system to apply the force field to. scale_amount: float, optional The optional amount to perturb the parameter by. Returns ------- simtk.openmm.System The created system. simtk.unit.Quantity The new value of the perturbed parameter. """ # As this method deals mainly with the toolkit, we stick to # simtk units here. from openforcefield.typing.engines.smirnoff import ForceField parameter_tag = self.parameter_key.tag parameter_smirks = self.parameter_key.smirks parameter_attribute = self.parameter_key.attribute original_handler = original_force_field.get_parameter_handler(parameter_tag) original_parameter = original_handler.parameters[parameter_smirks] if self.use_subset_of_force_field: force_field = ForceField() handler = copy.deepcopy( original_force_field.get_parameter_handler(parameter_tag) ) force_field.register_parameter_handler(handler) else: force_field = copy.deepcopy(original_force_field) handler = force_field.get_parameter_handler(parameter_tag) parameter_index = None value_list = None if hasattr(original_parameter, parameter_attribute): parameter_value = getattr(original_parameter, parameter_attribute) else: attribute_split = re.split(r"(\d+)", parameter_attribute) assert len(parameter_attribute) == 2 assert hasattr(original_parameter, attribute_split[0]) parameter_attribute = attribute_split[0] parameter_index = int(attribute_split[1]) - 1 value_list = getattr(original_parameter, parameter_attribute) parameter_value = value_list[parameter_index] if scale_amount is not None: existing_parameter = handler.parameters[parameter_smirks] if np.isclose(parameter_value.value_in_unit(parameter_value.unit), 0.0): # Careful thought needs to be given to this. Consider cases such as # epsilon or sigma where negative values are not allowed. parameter_value = ( scale_amount if scale_amount > 0.0 else 0.0 ) * parameter_value.unit else: parameter_value *= 1.0 + scale_amount if value_list is None: setattr(existing_parameter, parameter_attribute, parameter_value) else: value_list[parameter_index] = parameter_value setattr(existing_parameter, parameter_attribute, value_list) system = force_field.create_openmm_system(topology) if not self.enable_pbc: disable_pbc(system) return system, parameter_value def _evaluate_reduced_potential( self, system, trajectory, file_path, compute_resources, subset_energy_corrections=None, ): """Computes the reduced potential of each frame in a trajectory using the provided system. Parameters ---------- system: simtk.openmm.System The system which encodes the interaction forces for the specified parameter. trajectory: mdtraj.Trajectory A trajectory of configurations to evaluate. file_path: str The path to save the reduced potentials to. compute_resources: ComputeResources The compute resources available to execute on. subset_energy_corrections: unit.Quantity, optional A unit.Quantity wrapped numpy.ndarray which contains a set of energies to add to the re-evaluated potential energies. This is mainly used to correct the potential energies evaluated using a subset of the force field back to energies as if evaluated using the full thing. Returns --------- StatisticsArray The array containing the reduced potentials. """ from simtk import unit as simtk_unit integrator = openmm.VerletIntegrator(0.1 * simtk_unit.femtoseconds) platform = setup_platform_with_resources(compute_resources, True) openmm_context = openmm.Context(system, integrator, platform) potentials = np.zeros(trajectory.n_frames, dtype=np.float64) reduced_potentials = np.zeros(trajectory.n_frames, dtype=np.float64) temperature = pint_quantity_to_openmm(self.thermodynamic_state.temperature) beta = 1.0 / (simtk_unit.BOLTZMANN_CONSTANT_kB * temperature) pressure = pint_quantity_to_openmm(self.thermodynamic_state.pressure) if subset_energy_corrections is None: subset_energy_corrections = ( np.zeros(trajectory.n_frames, dtype=np.float64) * simtk_unit.kilojoules_per_mole ) else: subset_energy_corrections = pint_quantity_to_openmm( subset_energy_corrections ) for frame_index in range(trajectory.n_frames): positions = trajectory.xyz[frame_index] box_vectors = trajectory.openmm_boxes(frame_index) if self.enable_pbc: openmm_context.setPeriodicBoxVectors(*box_vectors) openmm_context.setPositions(positions) state = openmm_context.getState(getEnergy=True) potential_energy = ( state.getPotentialEnergy() + subset_energy_corrections[frame_index] ) unreduced_potential = potential_energy / simtk_unit.AVOGADRO_CONSTANT_NA if pressure is not None and self.enable_pbc: unreduced_potential += pressure * state.getPeriodicBoxVolume() potentials[frame_index] = potential_energy.value_in_unit( simtk_unit.kilojoule_per_mole ) reduced_potentials[frame_index] = unreduced_potential * beta potentials *= unit.kilojoule / unit.mole reduced_potentials *= unit.dimensionless statistics_array = StatisticsArray() statistics_array[ObservableType.ReducedPotential] = reduced_potentials statistics_array[ObservableType.PotentialEnergy] = potentials statistics_array.to_pandas_csv(file_path) return statistics_array
[docs] def execute(self, directory, available_resources): import mdtraj from openforcefield.topology import Molecule, Topology logging.info( f"Calculating the reduced gradient potentials for {self.parameter_key}: {self._id}" ) with open(self.force_field_path) as file: force_field_source = ForceFieldSource.parse_json(file.read()) if not isinstance(force_field_source, SmirnoffForceFieldSource): return PropertyEstimatorException( directory, "Only SMIRNOFF force fields are supported by " "this protocol.", ) # Load in the inputs force_field = force_field_source.to_force_field() trajectory = mdtraj.load_dcd( self.trajectory_file_path, self.coordinate_file_path ) unique_molecules = [] for component in self.substance.components: molecule = Molecule.from_smiles(smiles=component.smiles) unique_molecules.append(molecule) pdb_file = app.PDBFile(self.coordinate_file_path) topology = Topology.from_openmm( pdb_file.topology, unique_molecules=unique_molecules ) # Compute the difference between the energies using the reduced force field, # and the full force field. energy_corrections = None if self.use_subset_of_force_field: target_system, _ = self._build_reduced_system(force_field, topology) subset_potentials_path = path.join(directory, f"subset.csv") subset_potentials = self._evaluate_reduced_potential( target_system, trajectory, subset_potentials_path, available_resources ) full_statistics = StatisticsArray.from_pandas_csv(self.statistics_path) energy_corrections = ( full_statistics[ObservableType.PotentialEnergy] - subset_potentials[ObservableType.PotentialEnergy] ) # Build the slightly perturbed system. reverse_system, reverse_parameter_value = self._build_reduced_system( force_field, topology, -self.perturbation_scale ) forward_system, forward_parameter_value = self._build_reduced_system( force_field, topology, self.perturbation_scale ) self.reverse_parameter_value = openmm_quantity_to_pint(reverse_parameter_value) self.forward_parameter_value = openmm_quantity_to_pint(forward_parameter_value) # Calculate the reduced potentials. self.reverse_potentials_path = path.join(directory, "reverse.csv") self.forward_potentials_path = path.join(directory, "forward.csv") self._evaluate_reduced_potential( reverse_system, trajectory, self.reverse_potentials_path, available_resources, energy_corrections, ) self._evaluate_reduced_potential( forward_system, trajectory, self.forward_potentials_path, available_resources, energy_corrections, ) logging.info(f"Finished calculating the reduced gradient potentials.") return self._get_output_dictionary()
[docs]@register_calculation_protocol() class CentralDifferenceGradient(BaseProtocol): """A protocol which employs the central diference method to estimate the gradient of an observable A, such that grad = (A(x-h) - A(x+h)) / (2h) Notes ----- The `values` input must either be a list of unit.Quantity, a ProtocolPath to a list of unit.Quantity, or a list of ProtocolPath which each point to a unit.Quantity. """ parameter_key = protocol_input( docstring="The key of the parameter to differentiate with respect to.", type_hint=ParameterGradientKey, default_value=UNDEFINED, ) reverse_observable_value = protocol_input( docstring="The value of the observable evaluated using the parameters" "perturbed in the reverse direction.", type_hint=typing.Union[unit.Quantity, EstimatedQuantity], default_value=UNDEFINED, ) forward_observable_value = protocol_input( docstring="The value of the observable evaluated using the parameters" "perturbed in the forward direction.", type_hint=typing.Union[unit.Quantity, EstimatedQuantity], default_value=UNDEFINED, ) reverse_parameter_value = protocol_input( docstring="The value of the parameter perturbed in the reverse " "direction.", type_hint=unit.Quantity, default_value=UNDEFINED, ) forward_parameter_value = protocol_input( docstring="The value of the parameter perturbed in the forward " "direction.", type_hint=unit.Quantity, default_value=UNDEFINED, ) gradient = protocol_output( docstring="The estimated gradient", type_hint=ParameterGradient )
[docs] def execute(self, directory, available_resources): if self.forward_parameter_value < self.reverse_parameter_value: return PropertyEstimatorException( f"The forward parameter value ({self.forward_parameter_value}) must " f"be larger than the reverse value ({self.reverse_parameter_value})." ) reverse_value = self.reverse_observable_value forward_value = self.forward_observable_value if isinstance(reverse_value, EstimatedQuantity): reverse_value = reverse_value.value if isinstance(forward_value, EstimatedQuantity): forward_value = forward_value.value gradient = (forward_value - reverse_value) / ( self.forward_parameter_value - self.reverse_parameter_value ) self.gradient = ParameterGradient(self.parameter_key, gradient) return self._get_output_dictionary()