Source code for propertyestimator.storage.localfile

"""
A local file based storage backend.
"""
import json
import shutil
from os import path, makedirs

from propertyestimator.storage import StoredSimulationData
from propertyestimator.storage.dataclasses import BaseStoredData
from propertyestimator.substances import Substance
from propertyestimator.utils.serialization import TypedJSONEncoder, TypedJSONDecoder
from .storage import PropertyEstimatorStorage


[docs]class LocalFileStorage(PropertyEstimatorStorage): """A storage backend which stores files in directories on the local disk. """ @property def root_directory(self): """str: Returns the directory in which all stored objects are located.""" return self.root_directory
[docs] def __init__(self, root_directory='stored_data'): self._root_directory = root_directory if not path.isdir(root_directory): makedirs(root_directory) super().__init__()
def _store_object(self, storage_key, object_to_store): file_path = path.join(self._root_directory, storage_key) # If the object to store is a simple string we write that # directly, otherwise we try and JSONify the object. if not isinstance(object_to_store, str): object_to_store = json.dumps(object_to_store, cls=TypedJSONEncoder) with open(file_path, 'w') as file: file.write(object_to_store) super(LocalFileStorage, self)._store_object(storage_key, object_to_store) def _retrieve_object(self, storage_key): if not self._has_object(storage_key): return None file_path = path.join(self._root_directory, storage_key) with open(file_path, 'r') as file: loaded_object_string = file.read() try: loaded_object = json.loads(loaded_object_string, cls=TypedJSONDecoder) except json.JSONDecodeError: loaded_object = loaded_object_string return loaded_object def _has_object(self, storage_key): file_path = path.join(self._root_directory, storage_key) if not path.isfile(file_path): return False return True
[docs] def store_simulation_data(self, data_object, data_directory): unique_id = super(LocalFileStorage, self).store_simulation_data(data_object, data_directory) storage_directory_path = path.join(self._root_directory, f'{unique_id}_data') if path.isdir(storage_directory_path): shutil.rmtree(storage_directory_path, ignore_errors=True) shutil.move(data_directory, storage_directory_path) return unique_id
[docs] def retrieve_simulation_data_by_id(self, unique_id): """Attempts to retrieve a storage piece of simulation data from it's unique id. Parameters ---------- unique_id: str The unique id assigned to the data. Returns ------- BaseStoredData The stored data object. str The path to the data's corresponding directory. """ stored_object = self._retrieve_object(unique_id) # Make sure the stored object is a valid object. if not isinstance(stored_object, BaseStoredData): return None, None data_directory = path.join(self._root_directory, f'{unique_id}_data') return stored_object, data_directory
[docs] def retrieve_simulation_data(self, substance, include_component_data=True, data_class=StoredSimulationData): substance_ids = {substance.identifier} # Find the substance identifiers of the substance components if # we should include component data. if isinstance(substance, Substance) and include_component_data is True: for component in substance.components: component_substance = Substance() component_substance.add_component(component, Substance.MoleFraction()) substance_ids.add(component_substance.identifier) return_data = {} for substance_id in substance_ids: if substance_id not in self._simulation_data_by_substance: continue return_data[substance_id] = [] for data_key in self._simulation_data_by_substance[substance_id]: data_object, data_directory = self.retrieve_simulation_data_by_id(data_key) if data_object is None: continue if not isinstance(data_object, data_class): continue return_data[substance_id].append((data_object, data_directory)) return return_data