Source code for openff.evaluator.datasets.curation.workflow

import logging
from typing import List, Union, overload

import numpy
import pandas

from openff.evaluator._pydantic import BaseModel, Field
from openff.evaluator.datasets import PhysicalPropertyDataSet
from openff.evaluator.datasets.curation.components import CurationComponent
from openff.evaluator.datasets.curation.components.conversion import (
    ConversionComponentSchema,
)
from openff.evaluator.datasets.curation.components.filtering import (
    FilterComponentSchema,
)
from openff.evaluator.datasets.curation.components.selection import (
    SelectionComponentSchema,
)
from openff.evaluator.datasets.curation.components.thermoml import (
    ThermoMLComponentSchema,
)

logger = logging.getLogger(__name__)


[docs]class CurationWorkflowSchema(BaseModel): """A schemas which encodes how a set of curation components should be applied sequentially to a data set.""" component_schemas: List[ Union[ ConversionComponentSchema, FilterComponentSchema, SelectionComponentSchema, ThermoMLComponentSchema, ] ] = Field( default_factory=list, description="The schemas of the components to apply as part of this workflow. " "The components will be applied in the order they appear in this list.", )
[docs]class CurationWorkflow: """A convenience class for applying a set of curation components sequentially to a data set.""" @classmethod @overload def apply( cls, data_set: PhysicalPropertyDataSet, schema: CurationWorkflowSchema, n_processes: int = 1, ) -> PhysicalPropertyDataSet: ... @classmethod @overload def apply( cls, data_set: pandas.DataFrame, schema: CurationWorkflowSchema, n_processes: int = 1, ) -> pandas.DataFrame: ...
[docs] @classmethod def apply(cls, data_set, schema, n_processes=1): """Apply each component of this curation workflow to an initial data set in sequence. Parameters ---------- data_set The data set to apply the workflow to. This may either be a data set object or it's pandas representation. schema The schema which defines the components to apply. n_processes The number of processes that each component is allowed to parallelize across. Returns ------- The data set which has had the curation workflow applied to it. """ component_classes = CurationComponent.components data_frame = data_set if isinstance(data_frame, PhysicalPropertyDataSet): data_frame = data_frame.to_pandas() data_frame = data_frame.copy() data_frame = data_frame.fillna(value=numpy.nan) for component_schema in schema.component_schemas: component_class_name = component_schema.__class__.__name__.replace( "Schema", "" ) component_class = component_classes[component_class_name] logger.info(f"Applying {component_class_name}") data_frame = component_class.apply( data_frame, component_schema, n_processes ) logger.info(f"{component_class_name} applied") data_frame = data_frame.fillna(value=numpy.nan) if isinstance(data_set, PhysicalPropertyDataSet): data_frame = PhysicalPropertyDataSet.from_pandas(data_frame) return data_frame