# coding: utf-8
# Distributed under the terms of the MIT License.
""" This module implements various workflows, ways
of chaining up different calculations at high-throughput.
"""
import abc
import os
import logging
from matador.utils.print_utils import dumps
LOG = logging.getLogger("run3")
[docs]class Workflow:
"""Workflow objects are bundles of calculations defined as
:obj:`WorkflowStep` objects. Each :obj:`WorkflowStep` takes three arguments:
the :obj:`matador.compute.ComputeTask` object used to run the calculations, the calculation
parameters (which can be modified by each step), the seed name.
Any subclass of Workflow must implement `preprocess` and `postprocess`
methods (even if they just return True).
Attributes:
computer (:obj:`matador.compute.ComputeTask`): the object that will be running the computation.
calc_doc (dict): the interim dictionary of structural and
calculation parameters.
seed (str): the root seed for the calculation.
label (str): the name of the type of the Workflow object.
success (bool): the status of the workflow. This is only set to True after
final step completes, but BEFORE post-processing.
steps (:obj:`list` of :obj:`WorkflowStep`): list of steps to be
completed.
"""
def __init__(self, computer, calc_doc, seed, **workflow_kwargs):
"""Initialise the Workflow object from a :obj:`matador.compute.ComputeTask`, calculation
parameters and the seed name.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be running the computation.
calc_doc (dict): dictionary of structure and calculation
parameters.
seed (str): root seed for the calculation.
Raises:
RuntimeError: if any part of the calculation fails.
"""
self.computer = computer
self.calc_doc = calc_doc
self.seed = seed
self.label = self.__class__.__name__
self.compute_dir = computer.compute_dir
self.success = None
self.steps = []
self.clean_after_step = []
self.workflow_params = workflow_kwargs
LOG.info("Performing Workflow of type {} on {}".format(self.label, self.seed))
if self.computer.run3_settings.get("run3_settings") is not None:
settings = self.computer.kwargs.get("run3_settings")
# check that computer.exec was not overriden at cmd-line, then check settings file
if (
settings.get("castep_executable") is not None
and self.computer.executable == "castep"
):
self.castep_executable = settings.get("castep_executable", "castep")
self.computer.executable = self.castep_executable
if settings.get("optados_executable") is not None:
self.optados_executable = settings.get("optados_executable", "optados")
self.computer.optados_executable = self.optados_executable
self.preprocess()
LOG.info(
"Preprocessing completed, steps to perform: {}".format(
[step.name for step in self.steps]
)
)
try:
self.run_steps()
except RuntimeError as exc:
LOG.critical("Workflow failed: calling postprocess()")
self._clean_up()
raise exc
if self.success:
self.postprocess()
self._clean_up()
[docs] @abc.abstractmethod
def preprocess(self):
"""This function is run at the start of the workflow, and is
responsible for adding WorkflowStep objects to the Workflow.
"""
[docs] @abc.abstractmethod
def postprocess(self):
"""This OPTIONAL function is run upon successful completion of all steps
of the workflow and can be overloaded by the subclass to perform
any postprocessing steps. This occurs *before* cleaning up the
directory (i.e. moving to completed/bad_castep).
"""
def _clean_up(self, success=None):
"""This method moves files to `completed/` or `bad_castep/` depending on the status
of the workflow. It will use the current seed of the computer, so this function can be
called at intermediate steps if this seed changes.
"""
cwd = os.getcwd()
if success is None:
success = self.success
if self.compute_dir:
os.chdir(self.compute_dir)
if success:
LOG.info(
"Writing results from compute dir of Workflow {} run to completed folder and tidying up.".format(
self.label
)
)
self.computer.mv_to_completed(
self.computer.seed, keep=True, skip_existing=False
)
else:
LOG.info(
"Writing results from compute dir of failed Workflow {} run to bad_castep folder and tidying up.".format(
self.label
)
)
self.computer.mv_to_bad(self.computer.seed)
os.chdir(cwd)
if success:
LOG.info(
"Writing results of Workflow {} run to completed folder and tidying up.".format(
self.label
)
)
self.computer.mv_to_completed(
self.computer.seed, keep=True, skip_existing=True
)
else:
LOG.info(
"Writing results of failed Workflow {} run to bad_castep folder and tidying up.".format(
self.label
)
)
self.computer.mv_to_bad(self.computer.seed)
[docs] def add_step(
self,
function,
name,
input_exts=None,
output_exts=None,
clean_after=False,
**func_kwargs
):
"""Add a step to the workflow.
Parameters:
function (Function): the function to run in the step; must
accept arguments of (self.computer, self.calc_doc, self.seed).
name (str): the desired name for the step (human-readable).
Keyword arguments:
clean_after (bool): whether or not to clean up after this step is called
func_kwargs (dict): any arguments to pass to function when called.
"""
self.steps.append(
WorkflowStep(
function, name, self.compute_dir, input_exts, output_exts, **func_kwargs
)
)
self.clean_after_step.append(clean_after)
[docs] def run_steps(self):
"""Loop over steps and run them."""
try:
if not self.steps:
msg = "No steps added to Workflow!"
LOG.error(msg)
raise RuntimeError(msg)
for ind, step in enumerate(self.steps):
LOG.info("Running step {step.name}: {step.function}".format(step=step))
LOG.debug("Current state: " + dumps(self.calc_doc, indent=None))
success = step.run_step(self.computer, self.calc_doc, self.seed)
if self.clean_after_step[ind]:
self._clean_up(success=success)
self.success = True
except RuntimeError:
self.success = False
msg = "{} workflow exiting...".format(self.label)
LOG.error(msg)
raise RuntimeError(msg)
[docs]class WorkflowStep:
"""An individual step in a Workflow, defined by a Python function
and a name. The function will be called with arguments
(computer, calc_doc, seed) with the run_step method.
Attributes:
function (function): the function to call.
name (str): the human-readable name of the step.
compute_dir (str): the folder that computer will perform the calculation in.
func_kwargs (dict): any extra kwargs to pass to the function.
input_exts (list): list of input file extensions to cache after running.
output_exts (list): list of output file extensions to cache after running.
"""
success = False
def __init__(
self,
function,
name,
compute_dir=None,
input_exts=None,
output_exts=None,
**func_kwargs
):
"""Construct a WorkflowStep from a function."""
LOG.debug("Constructing WorkflowStep: {}".format(name))
self.function = function
self.name = name
self.compute_dir = compute_dir
self.func_kwargs = func_kwargs
self.input_exts = input_exts
self.output_exts = output_exts
def _cache_files(self, seed, exts, mode, directory=None):
"""Copy any files <seed>.<ext> for ext in exts to
<seed>.<ext>_<label>.
Parameters:
seed (str): seed for the workflow step.
exts (:obj:`list` of :obj:`str`): list of file extensions, including '.'.
mode (str): either 'in' (warning printed if file missing) or 'out' (no warning).
"""
import shutil
import glob
for ext in exts:
if "*" in ext:
srcs = glob.glob("{}{}".format(seed, ext))
else:
srcs = ["{}{}".format(seed, ext)]
for src in srcs:
dst = src + "_{}".format(self.name)
if os.path.isfile(src):
shutil.copy2(src, dst, follow_symlinks=True)
LOG.info("Backed up {} file {} to {}.".format(mode, src, dst))
else:
if mode == "in":
error = "Failed to cache input file {} for step {}.".format(
src, self.name
)
LOG.warning(error)
def _cache_inputs(self, seed):
"""Save any input files for the WorkflowStep with appropriate suffix
as determined by the WorkflowStep label. All files with <seed>.<ext>
will be moved to <seed>.<ext>_<name>, for any <ext> inside the
`input_exts` attribute. This is called after the WorkflowStep has
finished, even if it does not succeed...
Parameters:
seed (str): seed for the workflow step.
"""
if self.input_exts is not None:
self._cache_files(seed, self.input_exts, "in")
def _cache_outputs(self, seed):
"""Save any output files for the WorkflowStep with appropriate suffix
as determined by the WorkflowStep label. All files with <seed>.<ext>
will be moved to <seed>.<ext>_<name>, for any <ext> inside the
`output_exts` attribute.
Parameters:
seed (str): seed for the workflow step.
"""
if self.output_exts is not None:
self._cache_files(seed, self.output_exts, "out")
[docs] def cache_files(self, seed):
"""Wrapper for calling both _cache_inputs and _cache_outputs, without
throwing any errors.
"""
cwd = os.getcwd()
if self.compute_dir is not None:
os.chdir(self.compute_dir)
self._cache_inputs(seed)
self._cache_outputs(seed)
if self.compute_dir is not None:
os.chdir(cwd)
[docs] def run_step(self, computer, calc_doc, seed):
"""Run the workflow step.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be running the computation.
calc_doc (dict): dictionary of structure and calculation
parameters.
seed (str): root seed for the calculation.
Raises:
RuntimeError: if any step fails.
"""
try:
LOG.info("WorkflowStep {} starting...".format(self.name))
self.success = self.function(computer, calc_doc, seed, **self.func_kwargs)
except RuntimeError as exc:
msg = "WorkflowStep {} failed with error {}.".format(self.name, exc)
LOG.error(msg)
self.success = False
self.cache_files(seed)
raise exc
if self.success is None:
LOG.info(
"WorkflowStep {} skipped, did you provide all the input files?".format(
self.name
)
)
return self.success
if self.success:
LOG.info("WorkflowStep {} completed successfully.".format(self.name))
else:
LOG.warning("WorkflowStep {} was unsuccessful.".format(self.name))
self.cache_files(seed)
return self.success