Source code for openff.evaluator.datasets.curation.workflow
import logging
from typing import List, Union, overload
import numpy
import pandas
from openff.evaluator._pydantic import BaseModel, Field
from openff.evaluator.datasets import PhysicalPropertyDataSet
from openff.evaluator.datasets.curation.components import CurationComponent
from openff.evaluator.datasets.curation.components.conversion import (
ConversionComponentSchema,
)
from openff.evaluator.datasets.curation.components.filtering import (
FilterComponentSchema,
)
from openff.evaluator.datasets.curation.components.selection import (
SelectionComponentSchema,
)
from openff.evaluator.datasets.curation.components.thermoml import (
ThermoMLComponentSchema,
)
logger = logging.getLogger(__name__)
[docs]class CurationWorkflowSchema(BaseModel):
"""A schemas which encodes how a set of curation components should be applied
sequentially to a data set."""
component_schemas: List[
Union[
ConversionComponentSchema,
FilterComponentSchema,
SelectionComponentSchema,
ThermoMLComponentSchema,
]
] = Field(
default_factory=list,
description="The schemas of the components to apply as part of this workflow. "
"The components will be applied in the order they appear in this list.",
)
[docs]class CurationWorkflow:
"""A convenience class for applying a set of curation components
sequentially to a data set."""
@classmethod
@overload
def apply(
cls,
data_set: PhysicalPropertyDataSet,
schema: CurationWorkflowSchema,
n_processes: int = 1,
) -> PhysicalPropertyDataSet: ...
@classmethod
@overload
def apply(
cls,
data_set: pandas.DataFrame,
schema: CurationWorkflowSchema,
n_processes: int = 1,
) -> pandas.DataFrame: ...
[docs] @classmethod
def apply(cls, data_set, schema, n_processes=1):
"""Apply each component of this curation workflow to an initial data set in
sequence.
Parameters
----------
data_set
The data set to apply the workflow to. This may either be a
data set object or it's pandas representation.
schema
The schema which defines the components to apply.
n_processes
The number of processes that each component is allowed to
parallelize across.
Returns
-------
The data set which has had the curation workflow applied to it.
"""
component_classes = CurationComponent.components
data_frame = data_set
if isinstance(data_frame, PhysicalPropertyDataSet):
data_frame = data_frame.to_pandas()
data_frame = data_frame.copy()
data_frame = data_frame.fillna(value=numpy.nan)
for component_schema in schema.component_schemas:
component_class_name = component_schema.__class__.__name__.replace(
"Schema", ""
)
component_class = component_classes[component_class_name]
logger.info(f"Applying {component_class_name}")
data_frame = component_class.apply(
data_frame, component_schema, n_processes
)
logger.info(f"{component_class_name} applied")
data_frame = data_frame.fillna(value=numpy.nan)
if isinstance(data_set, PhysicalPropertyDataSet):
data_frame = PhysicalPropertyDataSet.from_pandas(data_frame)
return data_frame