# coding: utf-8
# Distributed under the terms of the MIT License.
""" This module implements the CastepSpectralWorkflow, which performs
spectral calculations with CASTEP in multiple steps:
1. Performs a singlepoint calculation (if check file is not found).
2. If the ``spectral_kpoints_mp_spacing`` keyword is found, interpolate
wavefunction to DOS grid.
- If an OptaDOS input file (.odi) with the root seedname
is found, run OptaDOS on the resulting density of states.
3. If ``spectral_kpoints_path_spacing`` keyword is present, create
a bandstructure on the SeeK-path generated path.
"""
import os
import copy
import glob
import logging
from matador.workflows import Workflow
from matador.scrapers import arbitrary2dict
from matador.workflows.castep.common import castep_scf
LOG = logging.getLogger("run3")
[docs]def castep_full_spectral(computer, calc_doc, seed, **kwargs):
"""Perform a "full" spectral calculation on a system, i.e. first
perform an SCF then interpolate to different kpoint paths/grids to
form DOS and dispersions. Optionally use OptaDOS for post-processing
of DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): dictionary of structure and calculation
parameters.
seed (str): root seed for the calculation.
Raises:
RuntimeError: if any part of the calculation fails.
Returns:
bool: True if Workflow completed successfully, or False otherwise.
"""
workflow = CastepSpectralWorkflow(computer, calc_doc, seed)
return workflow.success
[docs]class CastepSpectralWorkflow(Workflow):
"""Perform a "full" spectral calculation on a system, i.e. first
perform an SCF then interpolate to different kpoint paths/grids to
form DOS and dispersions. Optionally use OptaDOS for post-processing
of DOS.
Attributes:
computer (:obj:`matador.compute.ComputeTask`): the object that calls CASTEP.
calc_doc (dict): the interim dictionary of structural and
calculation parameters.
seed (str): the root seed for the calculation.
success (bool): the status of the Workflow: only set to True after
post-processing method completes.
"""
[docs] def preprocess(self):
"""Decide which parts of the Workflow need to be performed,
and set the appropriate CASTEP parameters.
"""
# default todo
todo = {
"scf": True,
"dos": False,
"pdos": False,
"broadening": False,
"dispersion": False,
"pdis": False,
}
# definition of steps and names
steps = {
"scf": castep_spectral_scf,
"dos": castep_spectral_dos,
"pdos": optados_pdos,
"broadening": optados_dos_broadening,
"dispersion": castep_spectral_dispersion,
"pdis": optados_pdispersion,
}
exts = {
"scf": {
"input": [".cell", ".param"],
"output": [".castep", ".*err", "-out.cell"],
},
"dos": {
"input": [".cell", ".param"],
"output": [
".castep",
".bands",
".pdos_bin",
".dome_bin",
".*err",
"-out.cell",
],
},
"dispersion": {
"input": [".cell", ".param"],
"output": [
".castep",
".bands",
".pdos_bin",
".dome_bin",
".*err",
"-out.cell",
],
},
"pdis": {"input": [".odi", ".pdos_bin"], "output": [".odo", ".*err"]},
"pdos": {
"input": [".odi", ".pdos_bin", ".dome_bin"],
"output": [".odo", ".*err"],
},
"broadening": {
"input": [".odi", ".pdos_bin", ".dome_bin"],
"output": [".odo", ".*err"],
},
}
if os.path.isfile(self.seed + ".check"):
LOG.info("Found {}.check, so skipping initial SCF.".format(self.seed))
todo["scf"] = False
if (
"spectral_kpoints_path" in self.calc_doc
or "spectral_kpoints_list" in self.calc_doc
or "spectral_kpoints_path_spacing" in self.calc_doc
or self.calc_doc.get("spectral_task", "").lower() == "bandstructure"
):
todo["dispersion"] = not os.path.isfile(self.seed + ".bands_dispersion")
if (
"spectral_kpoints_mp_spacing" in self.calc_doc
or self.calc_doc.get("spectral_task", "").lower() == "dos"
):
todo["dos"] = not os.path.isfile(self.seed + ".bands_dos")
odi_fname = _get_optados_fname(self.seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
if todo["dispersion"]:
todo["pdis"] = "pdispersion" in odi_dict
if todo["dos"]:
todo["broadening"] = "broadening" in odi_dict
todo["pdos"] = "pdos" in odi_dict
for key in todo:
if todo[key]:
self.add_step(
steps[key],
key,
input_exts=exts[key].get("input"),
output_exts=exts[key].get("output"),
)
# if not using a user-requested path, use seekpath and spglib
# to reduce to primitive and use consistent path
if (
"spectral_kpoints_list" not in self.calc_doc
and "spectral_kpoints_path" not in self.calc_doc
):
from matador.utils.cell_utils import cart2abc
prim_doc, kpt_path = self.computer.get_seekpath_compliant_input(
self.calc_doc, self.calc_doc.get("spectral_kpoints_path_spacing", 0.05)
)
self.calc_doc.update(prim_doc)
self.calc_doc["lattice_abc"] = cart2abc(self.calc_doc["lattice_cart"])
if todo["dispersion"]:
self.calc_doc["spectral_kpoints_list"] = kpt_path
elif todo["dispersion"] and "spectral_kpoints_path" in self.calc_doc:
self._user_defined_kpt_path = True
LOG.warning("Using user-defined k-point path for all structures.")
self.calc_doc["spectral_kpoints_path_spacing"] = self.calc_doc.get(
"spectral_kpoints_path_spacing", 0.05
)
if todo["dos"]:
self.calc_doc["spectral_kpoints_mp_spacing"] = self.calc_doc.get(
"spectral_kpoints_mp_spacing", 0.05
)
# always use continuation
self.calc_doc["continuation"] = "default"
[docs]def castep_spectral_scf(computer, calc_doc, seed):
"""Run a singleshot SCF calculation.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info("Performing CASTEP spectral SCF...")
scf_doc = copy.deepcopy(calc_doc)
scf_doc["write_checkpoint"] = "ALL"
scf_doc["task"] = "singlepoint"
scf_doc["write_cell_structure"] = True
if "spectral_task" in scf_doc:
del scf_doc["spectral_task"]
required = []
forbidden = [
"spectral_task",
"spectral_kpoints_list",
"spectral_kpoints_path",
"spectral_kpoints_mp_spacing",
"spectral_kpoints_path_spacing",
]
return castep_scf(
computer, scf_doc, seed, required_keys=required, forbidden_keys=forbidden
)
[docs]def castep_spectral_dos(computer, calc_doc, seed):
"""Runs a DOS interpolation on top of a completed SCF. If a single
.odi file is found, run OptaDOS on the resulting DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info("Performing CASTEP spectral DOS calculation...")
dos_doc = copy.deepcopy(calc_doc)
dos_doc["task"] = "spectral"
dos_doc["spectral_task"] = "dos"
# disable checkpointing for BS/DOS by default, leaving just SCF
dos_doc["write_checkpoint"] = "none"
dos_doc["write_cell_structure"] = True
dos_doc["pdos_calculate_weights"] = True
required = ["spectral_kpoints_mp_spacing"]
forbidden = [
"spectral_kpoints_list",
"spectral_kpoints_path",
"spectral_kpoints_path_spacing",
]
computer.validate_calc_doc(dos_doc, required, forbidden)
success = computer.run_castep_singleshot(
dos_doc, seed, keep=True, intermediate=True
)
return success
[docs]def castep_spectral_dispersion(computer, calc_doc, seed):
"""Runs a dispersion interpolation on top of a completed SCF calculation,
optionally running orbitals2bands and OptaDOS projected dispersion.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info("Performing CASTEP spectral dispersion calculation...")
disp_doc = copy.deepcopy(calc_doc)
disp_doc["task"] = "spectral"
disp_doc["spectral_task"] = "bandstructure"
# disable checkpointing for BS/DOS by default, leaving just SCF
disp_doc["write_checkpoint"] = "none"
disp_doc["pdos_calculate_weights"] = True
disp_doc["write_cell_structure"] = True
disp_doc["continuation"] = "default"
required = []
forbidden = ["spectral_kpoints_mp_spacing"]
computer.validate_calc_doc(disp_doc, required, forbidden)
success = computer.run_castep_singleshot(
disp_doc, seed, keep=True, intermediate=True
)
if disp_doc.get("write_orbitals"):
LOG.info("Planning to call orbitals2bands...")
_cache_executable = copy.deepcopy(computer.executable)
_cache_core = copy.deepcopy(computer.ncores)
computer.ncores = 1
computer.executable = "orbitals2bands"
try:
success = computer.run_generic(intermediate=True, mv_bad_on_failure=False)
except Exception as exc:
computer.executable = _cache_executable
computer.ncores = _cache_core
LOG.warning("Failed to call orbitals2bands, with error: {}".format(exc))
computer.ncores = _cache_core
computer.executable = _cache_executable
return success
[docs]def optados_pdos(computer, _, seed):
"""Run an OptaDOS projected-DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
odi_dict["task"] = "pdos"
if "pdispersion" in odi_dict:
del odi_dict["pdispersion"]
LOG.info(
"Performing OptaDOS pDOS calculation with parameters from {}".format(
odi_fname
)
)
success = _run_optados(computer, odi_dict, seed, suffix="dos")
return success
return None
[docs]def optados_dos_broadening(computer, _, seed):
"""Run an OptaDOS total DOS broadening.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
# if broadening keyword is present, try to run a normal DOS
odi_dict["task"] = "dos"
if "pdos" in odi_dict:
del odi_dict["pdos"]
if "pdispersion" in odi_dict:
del odi_dict["pdispersion"]
LOG.info(
"Performing OptaDOS DOS broadening with parameters from {}".format(
odi_fname
)
)
return _run_optados(computer, odi_dict, seed, suffix="dos")
return None
[docs]def optados_pdispersion(computer, _, seed):
"""Runs an OptaDOS projected dispersion calculation.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
# if pdispersion keyword is present, try to run a pdis
odi_dict["task"] = "pdispersion"
if "pdos" in odi_dict:
del odi_dict["pdos"]
LOG.info(
"Performing OptaDOS pDIS calculation with parameters from {}".format(
odi_fname
)
)
return _run_optados(computer, odi_dict, seed, suffix="dispersion")
return None
def _run_optados(computer, odi_dict, seed, suffix=None):
"""Run OptaDOS with given computer object, parameters and seed, adjusting
the number of cores and the executable to call, then restoring them after.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
odi_dict (dict): the OptaDOS parameters to write to file.
seed (str): root filename of structure.
Keyword arguments:
suffix (str): either 'dos' or 'dispersion' for backup/restore.
executable (str): optados executable to call.
"""
from matador.export import doc2arbitrary
odi_path = "{}.odi".format(seed)
if computer.compute_dir is not None:
odi_path = computer.compute_dir + "/" + odi_path
doc2arbitrary(odi_dict, odi_path, overwrite=True)
if suffix is not None:
_get_correct_files_for_optados(seed, suffix=suffix)
_cache_executable = copy.deepcopy(computer.executable)
_cache_core = copy.deepcopy(computer.ncores)
_cache_nodes = copy.deepcopy(computer.nnodes)
computer.ncores = 1
computer.nnodes = 1
computer.executable = computer.optados_executable
success = False
try:
success = computer.run_generic(intermediate=True, mv_bad_on_failure=False)
except Exception as exc:
LOG.warning("Failed to call optados with error: {}".format(exc))
computer.ncores = _cache_core
computer.nnodes = _cache_nodes
computer.executable = _cache_executable
if suffix is not None:
_get_correct_files_for_optados(seed, suffix="bak")
return success
def _get_optados_fname(seed):
"""Get the most likely OptaDOS input file name, which here means either
only existing one, or the shortest one.
Parameters:
seed (str): seedname for the calculation.
Returns:
str: the OptaDOS input filename.
"""
odi_fname = None
if os.path.isfile(seed + ".odi"):
os.remove(seed + ".odi")
if glob.glob("*.odi"):
# dodginess: choose the odi file with the shortest name...
shortest_fname = None
for fname in glob.glob("*.odi"):
if shortest_fname is None or len(fname) < len(shortest_fname):
shortest_fname = fname
odi_fname = shortest_fname
return odi_fname
def _get_correct_files_for_optados(seed, suffix=None):
"""If e.g. dispersion and DOS calculations were run previously, but
it is unclear which exists as the current <seed>.<ext>, try to copy
the old <seed>.<ext>_dispersion/dos files to the correct place.
Parameters:
seed (str): seedname for the calculation.
Keyword arguments:
suffix (str): either 'dos' or 'dispersion', or 'bak' to get old files back.
"""
import shutil
LOG.debug("Getting files for OptaDOS: {} {}".format(seed, suffix))
if suffix is None:
return
files_to_cache = [
".pdos_bin",
".dome_bin",
"-out.cell",
".bands",
".cell",
".param",
]
for ext in files_to_cache:
old_file = "{}{}_{}".format(seed, ext, suffix)
current_file = "{}{}".format(seed, ext)
if os.path.isfile(current_file):
if suffix != "bak":
backup_file = "{}{}_{}".format(seed, ext, "bak")
shutil.copy2(current_file, backup_file)
os.remove(current_file)
if os.path.isfile(old_file):
LOG.debug("Copying {} to {}".format(old_file, current_file))
shutil.copy2(old_file, current_file)
if suffix == "bak":
os.remove(old_file)