Source code for propertyestimator.protocols.miscellaneous

"""
A collection of miscellaneous protocols, mostly aimed at performing simple
math operations.
"""
import numpy as np
import typing

from propertyestimator import unit
from propertyestimator.properties import ParameterGradient
from propertyestimator.substances import Substance
from propertyestimator.utils.exceptions import PropertyEstimatorException
from propertyestimator.utils.quantities import EstimatedQuantity
from propertyestimator.workflow.decorators import protocol_input, protocol_output
from propertyestimator.workflow.plugins import register_calculation_protocol
from propertyestimator.workflow.protocols import BaseProtocol


[docs]@register_calculation_protocol() class AddValues(BaseProtocol): """A protocol to add together a list of values. Notes ----- The `values` input must either be a list of unit.Quantity, a ProtocolPath to a list of unit.Quantity, or a list of ProtocolPath which each point to a unit.Quantity. """ @protocol_input(list) def values(self): """The values to add together.""" pass @protocol_output(typing.Union[int, float, EstimatedQuantity, unit.Quantity, ParameterGradient]) def result(self): """The sum of the values.""" pass
[docs] def __init__(self, protocol_id): """Constructs a new AddValues object.""" super().__init__(protocol_id) self._values = None self._result = None
[docs] def execute(self, directory, available_resources): if len(self._values) < 1: return PropertyEstimatorException(directory, 'There were no gradients to add together') if not all(isinstance(x, type(self._values[0])) for x in self._values): return PropertyEstimatorException(directory, f'All values to add together must be ' f'the same type ({" ".join(map(str, self._values))}).') self._result = self._values[0] for value in self._values[1:]: self._result += value return self._get_output_dictionary()
[docs]@register_calculation_protocol() class SubtractValues(BaseProtocol): """A protocol to subtract one value from another such that: `result = value_b - value_a` """ @protocol_input(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def value_a(self): """`value_a` in the formula `result = value_b - value_a`""" pass @protocol_input(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def value_b(self): """`value_b` in the formula `result = value_b - value_a`""" pass @protocol_output(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def result(self): """The sum of the values.""" pass
[docs] def __init__(self, protocol_id): """Constructs a new AddValues object.""" super().__init__(protocol_id) self._value_a = None self._value_b = None self._result = None
[docs] def execute(self, directory, available_resources): self._result = self._value_b - self._value_a return self._get_output_dictionary()
[docs]@register_calculation_protocol() class MultiplyValue(BaseProtocol): """A protocol which multiplies a value by a specified scalar """ @protocol_input(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def value(self): """The value to multiply.""" pass @protocol_input(typing.Union[int, float, unit.Quantity]) def multiplier(self): """The scalar to multiply by.""" pass @protocol_output(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def result(self): """The result of the multiplication.""" pass
[docs] def __init__(self, protocol_id): """Constructs a new MultiplyValue object.""" super().__init__(protocol_id) self._value = None self._multiplier = None self._result = None
[docs] def execute(self, directory, available_resources): if isinstance(self._value, EstimatedQuantity): self._result = EstimatedQuantity(self._value.value * self._multiplier, self._value.uncertainty * self._multiplier, *self._value.sources) else: self._result = self._value * self._multiplier return self._get_output_dictionary()
[docs]@register_calculation_protocol() class DivideValue(BaseProtocol): """A protocol which divides a value by a specified scalar """ @protocol_input(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def value(self): """The value to divide.""" pass @protocol_input(typing.Union[int, float, unit.Quantity]) def divisor(self): """The scalar to divide by.""" pass @protocol_output(typing.Union[int, float, unit.Quantity, EstimatedQuantity, ParameterGradient]) def result(self): """The result of the division.""" pass
[docs] def __init__(self, protocol_id): """Constructs a new DivideValue object.""" super().__init__(protocol_id) self._value = None self._divisor = None self._result = None
[docs] def execute(self, directory, available_resources): self._result = self._value / self._divisor return self._get_output_dictionary()
[docs]@register_calculation_protocol() class BaseWeightByMoleFraction(BaseProtocol): """Multiplies a value by the mole fraction of a component in a mixture substance. """ @protocol_input(Substance) def component(self, value): """The component (e.g water) to which this value belongs.""" pass @protocol_input(Substance) def full_substance(self, value): """The full substance of which the component of interest is a part.""" pass
[docs] def __init__(self, protocol_id): super().__init__(protocol_id) self._value = None self._component = None self._full_substance = None self._weighted_value = None
def _weight_values(self, mole_fraction): """Weights a value by a components mole fraction. Parameters ---------- mole_fraction: float The mole fraction to weight by. Returns ------- Any The weighted value. """ raise NotImplementedError()
[docs] def execute(self, directory, available_resources): assert len(self._component.components) == 1 main_component = self._component.components[0] amounts = self._full_substance.get_amounts(main_component) if len(amounts) != 1: return PropertyEstimatorException(directory=directory, message=f'More than one type of amount was defined for component ' f'{main_component}. Only a single mole fraction must be ' f'defined.') amount = next(iter(amounts)) if not isinstance(amount, Substance.MoleFraction): return PropertyEstimatorException(directory=directory, message=f'The component {main_component} was given as an ' f'exact amount, and not a mole fraction') self._weighted_value = self._weight_values(amount.value) return self._get_output_dictionary()
[docs]@register_calculation_protocol() class WeightByMoleFraction(BaseWeightByMoleFraction): """Multiplies a value by the mole fraction of a component in a `Substance`. """ @protocol_input(typing.Union[float, int, EstimatedQuantity, unit.Quantity, ParameterGradient]) def value(self): """The value to be weighted.""" pass @protocol_output(typing.Union[float, int, EstimatedQuantity, unit.Quantity, ParameterGradient]) def weighted_value(self, value): """The value weighted by the `component`s mole fraction as determined from the `full_substance`.""" pass def _weight_values(self, mole_fraction): """ Returns ------- float, int, EstimatedQuantity, unit.Quantity, ParameterGradient The weighted value. """ return self._value * mole_fraction
[docs]@register_calculation_protocol() class FilterSubstanceByRole(BaseProtocol): """A protocol which takes a substance as input, and returns a substance which only contains components whose role match a given criteria. """ @protocol_input(Substance) def input_substance(self): """The substance to filter.""" pass @protocol_input(Substance.ComponentRole) def component_role(self): """The role to filter substance components against.""" pass @protocol_input(int) def expected_components(self): """The number of components expected to remain after filtering. An exception is raised if this number is not matched. Setting this value to -1 will disable this check.""" pass @protocol_output(Substance) def filtered_substance(self): """The filtered substance.""" pass
[docs] def __init__(self, protocol_id): """Constructs a new AddValues object.""" super().__init__(protocol_id) self._input_substance = None self._component_role = None self._expected_components = -1 self._filtered_substance = None
[docs] def execute(self, directory, available_resources): filtered_components = [] total_mole_fraction = 0.0 for component in self._input_substance.components: if component.role != self._component_role: continue filtered_components.append(component) amounts = self._input_substance.get_amounts(component) for amount in amounts: if not isinstance(amount, Substance.MoleFraction): continue total_mole_fraction += amount.value if 0 <= self._expected_components != len(filtered_components): return PropertyEstimatorException(directory=directory, message=f'The filtered substance does not contain the expected ' f'number of components ({self._expected_components}) - ' f'{filtered_components}') inverse_mole_fraction = 1.0 if np.isclose(total_mole_fraction, 0.0) else 1.0 / total_mole_fraction self._filtered_substance = Substance() for component in filtered_components: amounts = self._input_substance.get_amounts(component) for amount in amounts: if isinstance(amount, Substance.MoleFraction): amount = Substance.MoleFraction(amount.value * inverse_mole_fraction) self._filtered_substance.add_component(component, amount) return self._get_output_dictionary()