Source code for matador.workflows.castep.spectral

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This module implements the CastepSpectralWorkflow, which performs
spectral calculations with CASTEP in multiple steps:

1. Performs a singlepoint calculation (if check file is not found).
2. If the ``spectral_kpoints_mp_spacing`` keyword is found, interpolate
   wavefunction to DOS grid.
   - If an OptaDOS input file (.odi) with the root seedname
     is found, run OptaDOS on the resulting density of states.
3. If ``spectral_kpoints_path_spacing`` keyword is present, create
   a bandstructure on the SeeK-path generated path.

"""


import os
import copy
import glob
import logging
from matador.workflows import Workflow
from matador.scrapers import arbitrary2dict
from matador.workflows.castep.common import castep_scf

LOG = logging.getLogger("run3")


[docs]def castep_full_spectral(computer, calc_doc, seed, **kwargs): """Perform a "full" spectral calculation on a system, i.e. first perform an SCF then interpolate to different kpoint paths/grids to form DOS and dispersions. Optionally use OptaDOS for post-processing of DOS. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP. calc_doc (dict): dictionary of structure and calculation parameters. seed (str): root seed for the calculation. Raises: RuntimeError: if any part of the calculation fails. Returns: bool: True if Workflow completed successfully, or False otherwise. """ workflow = CastepSpectralWorkflow(computer, calc_doc, seed) return workflow.success
[docs]class CastepSpectralWorkflow(Workflow): """Perform a "full" spectral calculation on a system, i.e. first perform an SCF then interpolate to different kpoint paths/grids to form DOS and dispersions. Optionally use OptaDOS for post-processing of DOS. Attributes: computer (:obj:`matador.compute.ComputeTask`): the object that calls CASTEP. calc_doc (dict): the interim dictionary of structural and calculation parameters. seed (str): the root seed for the calculation. success (bool): the status of the Workflow: only set to True after post-processing method completes. """
[docs] def preprocess(self): """Decide which parts of the Workflow need to be performed, and set the appropriate CASTEP parameters. """ # default todo todo = { "scf": True, "dos": False, "pdos": False, "broadening": False, "dispersion": False, "pdis": False, } # definition of steps and names steps = { "scf": castep_spectral_scf, "dos": castep_spectral_dos, "pdos": optados_pdos, "broadening": optados_dos_broadening, "dispersion": castep_spectral_dispersion, "pdis": optados_pdispersion, } exts = { "scf": { "input": [".cell", ".param"], "output": [".castep", ".*err", "-out.cell"], }, "dos": { "input": [".cell", ".param"], "output": [ ".castep", ".bands", ".pdos_bin", ".dome_bin", ".*err", "-out.cell", ], }, "dispersion": { "input": [".cell", ".param"], "output": [ ".castep", ".bands", ".pdos_bin", ".dome_bin", ".*err", "-out.cell", ], }, "pdis": {"input": [".odi", ".pdos_bin"], "output": [".odo", ".*err"]}, "pdos": { "input": [".odi", ".pdos_bin", ".dome_bin"], "output": [".odo", ".*err"], }, "broadening": { "input": [".odi", ".pdos_bin", ".dome_bin"], "output": [".odo", ".*err"], }, } if os.path.isfile(self.seed + ".check"): LOG.info("Found {}.check, so skipping initial SCF.".format(self.seed)) todo["scf"] = False if ( "spectral_kpoints_path" in self.calc_doc or "spectral_kpoints_list" in self.calc_doc or "spectral_kpoints_path_spacing" in self.calc_doc or self.calc_doc.get("spectral_task", "").lower() == "bandstructure" ): todo["dispersion"] = not os.path.isfile(self.seed + ".bands_dispersion") if ( "spectral_kpoints_mp_spacing" in self.calc_doc or self.calc_doc.get("spectral_task", "").lower() == "dos" ): todo["dos"] = not os.path.isfile(self.seed + ".bands_dos") odi_fname = _get_optados_fname(self.seed) if odi_fname is not None: odi_dict, _ = arbitrary2dict(odi_fname) if todo["dispersion"]: todo["pdis"] = "pdispersion" in odi_dict if todo["dos"]: todo["broadening"] = "broadening" in odi_dict todo["pdos"] = "pdos" in odi_dict for key in todo: if todo[key]: self.add_step( steps[key], key, input_exts=exts[key].get("input"), output_exts=exts[key].get("output"), ) # if not using a user-requested path, use seekpath and spglib # to reduce to primitive and use consistent path if ( "spectral_kpoints_list" not in self.calc_doc and "spectral_kpoints_path" not in self.calc_doc ): from matador.utils.cell_utils import cart2abc prim_doc, kpt_path = self.computer.get_seekpath_compliant_input( self.calc_doc, self.calc_doc.get("spectral_kpoints_path_spacing", 0.05) ) self.calc_doc.update(prim_doc) self.calc_doc["lattice_abc"] = cart2abc(self.calc_doc["lattice_cart"]) if todo["dispersion"]: self.calc_doc["spectral_kpoints_list"] = kpt_path elif todo["dispersion"] and "spectral_kpoints_path" in self.calc_doc: self._user_defined_kpt_path = True LOG.warning("Using user-defined k-point path for all structures.") self.calc_doc["spectral_kpoints_path_spacing"] = self.calc_doc.get( "spectral_kpoints_path_spacing", 0.05 ) if todo["dos"]: self.calc_doc["spectral_kpoints_mp_spacing"] = self.calc_doc.get( "spectral_kpoints_mp_spacing", 0.05 ) # always use continuation self.calc_doc["continuation"] = "default"
[docs]def castep_spectral_scf(computer, calc_doc, seed): """Run a singleshot SCF calculation. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP. calc_doc (dict): the structure to run on. seed (str): root filename of structure. """ LOG.info("Performing CASTEP spectral SCF...") scf_doc = copy.deepcopy(calc_doc) scf_doc["write_checkpoint"] = "ALL" scf_doc["task"] = "singlepoint" scf_doc["write_cell_structure"] = True if "spectral_task" in scf_doc: del scf_doc["spectral_task"] required = [] forbidden = [ "spectral_task", "spectral_kpoints_list", "spectral_kpoints_path", "spectral_kpoints_mp_spacing", "spectral_kpoints_path_spacing", ] return castep_scf( computer, scf_doc, seed, required_keys=required, forbidden_keys=forbidden )
[docs]def castep_spectral_dos(computer, calc_doc, seed): """Runs a DOS interpolation on top of a completed SCF. If a single .odi file is found, run OptaDOS on the resulting DOS. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP. calc_doc (dict): the structure to run on. seed (str): root filename of structure. """ LOG.info("Performing CASTEP spectral DOS calculation...") dos_doc = copy.deepcopy(calc_doc) dos_doc["task"] = "spectral" dos_doc["spectral_task"] = "dos" # disable checkpointing for BS/DOS by default, leaving just SCF dos_doc["write_checkpoint"] = "none" dos_doc["write_cell_structure"] = True dos_doc["pdos_calculate_weights"] = True required = ["spectral_kpoints_mp_spacing"] forbidden = [ "spectral_kpoints_list", "spectral_kpoints_path", "spectral_kpoints_path_spacing", ] computer.validate_calc_doc(dos_doc, required, forbidden) success = computer.run_castep_singleshot( dos_doc, seed, keep=True, intermediate=True ) return success
[docs]def castep_spectral_dispersion(computer, calc_doc, seed): """Runs a dispersion interpolation on top of a completed SCF calculation, optionally running orbitals2bands and OptaDOS projected dispersion. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP. calc_doc (dict): the structure to run on. seed (str): root filename of structure. """ LOG.info("Performing CASTEP spectral dispersion calculation...") disp_doc = copy.deepcopy(calc_doc) disp_doc["task"] = "spectral" disp_doc["spectral_task"] = "bandstructure" # disable checkpointing for BS/DOS by default, leaving just SCF disp_doc["write_checkpoint"] = "none" disp_doc["pdos_calculate_weights"] = True disp_doc["write_cell_structure"] = True disp_doc["continuation"] = "default" required = [] forbidden = ["spectral_kpoints_mp_spacing"] computer.validate_calc_doc(disp_doc, required, forbidden) success = computer.run_castep_singleshot( disp_doc, seed, keep=True, intermediate=True ) if disp_doc.get("write_orbitals"): LOG.info("Planning to call orbitals2bands...") _cache_executable = copy.deepcopy(computer.executable) _cache_core = copy.deepcopy(computer.ncores) computer.ncores = 1 computer.executable = "orbitals2bands" try: success = computer.run_generic(intermediate=True, mv_bad_on_failure=False) except Exception as exc: computer.executable = _cache_executable computer.ncores = _cache_core LOG.warning("Failed to call orbitals2bands, with error: {}".format(exc)) computer.ncores = _cache_core computer.executable = _cache_executable return success
[docs]def optados_pdos(computer, _, seed): """Run an OptaDOS projected-DOS. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS. _ : second parameter is required but ignored. seed (str): root filename of structure. """ odi_fname = _get_optados_fname(seed) if odi_fname is not None: odi_dict, _ = arbitrary2dict(odi_fname) odi_dict["task"] = "pdos" if "pdispersion" in odi_dict: del odi_dict["pdispersion"] LOG.info( "Performing OptaDOS pDOS calculation with parameters from {}".format( odi_fname ) ) success = _run_optados(computer, odi_dict, seed, suffix="dos") return success return None
[docs]def optados_dos_broadening(computer, _, seed): """Run an OptaDOS total DOS broadening. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS. _ : second parameter is required but ignored. seed (str): root filename of structure. """ odi_fname = _get_optados_fname(seed) if odi_fname is not None: odi_dict, _ = arbitrary2dict(odi_fname) # if broadening keyword is present, try to run a normal DOS odi_dict["task"] = "dos" if "pdos" in odi_dict: del odi_dict["pdos"] if "pdispersion" in odi_dict: del odi_dict["pdispersion"] LOG.info( "Performing OptaDOS DOS broadening with parameters from {}".format( odi_fname ) ) return _run_optados(computer, odi_dict, seed, suffix="dos") return None
[docs]def optados_pdispersion(computer, _, seed): """Runs an OptaDOS projected dispersion calculation. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS. _ : second parameter is required but ignored. seed (str): root filename of structure. """ odi_fname = _get_optados_fname(seed) if odi_fname is not None: odi_dict, _ = arbitrary2dict(odi_fname) # if pdispersion keyword is present, try to run a pdis odi_dict["task"] = "pdispersion" if "pdos" in odi_dict: del odi_dict["pdos"] LOG.info( "Performing OptaDOS pDIS calculation with parameters from {}".format( odi_fname ) ) return _run_optados(computer, odi_dict, seed, suffix="dispersion") return None
def _run_optados(computer, odi_dict, seed, suffix=None): """Run OptaDOS with given computer object, parameters and seed, adjusting the number of cores and the executable to call, then restoring them after. Parameters: computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS. odi_dict (dict): the OptaDOS parameters to write to file. seed (str): root filename of structure. Keyword arguments: suffix (str): either 'dos' or 'dispersion' for backup/restore. executable (str): optados executable to call. """ from matador.export import doc2arbitrary odi_path = "{}.odi".format(seed) if computer.compute_dir is not None: odi_path = computer.compute_dir + "/" + odi_path doc2arbitrary(odi_dict, odi_path, overwrite=True) if suffix is not None: _get_correct_files_for_optados(seed, suffix=suffix) _cache_executable = copy.deepcopy(computer.executable) _cache_core = copy.deepcopy(computer.ncores) _cache_nodes = copy.deepcopy(computer.nnodes) computer.ncores = 1 computer.nnodes = 1 computer.executable = computer.optados_executable success = False try: success = computer.run_generic(intermediate=True, mv_bad_on_failure=False) except Exception as exc: LOG.warning("Failed to call optados with error: {}".format(exc)) computer.ncores = _cache_core computer.nnodes = _cache_nodes computer.executable = _cache_executable if suffix is not None: _get_correct_files_for_optados(seed, suffix="bak") return success def _get_optados_fname(seed): """Get the most likely OptaDOS input file name, which here means either only existing one, or the shortest one. Parameters: seed (str): seedname for the calculation. Returns: str: the OptaDOS input filename. """ odi_fname = None if os.path.isfile(seed + ".odi"): os.remove(seed + ".odi") if glob.glob("*.odi"): # dodginess: choose the odi file with the shortest name... shortest_fname = None for fname in glob.glob("*.odi"): if shortest_fname is None or len(fname) < len(shortest_fname): shortest_fname = fname odi_fname = shortest_fname return odi_fname def _get_correct_files_for_optados(seed, suffix=None): """If e.g. dispersion and DOS calculations were run previously, but it is unclear which exists as the current <seed>.<ext>, try to copy the old <seed>.<ext>_dispersion/dos files to the correct place. Parameters: seed (str): seedname for the calculation. Keyword arguments: suffix (str): either 'dos' or 'dispersion', or 'bak' to get old files back. """ import shutil LOG.debug("Getting files for OptaDOS: {} {}".format(seed, suffix)) if suffix is None: return files_to_cache = [ ".pdos_bin", ".dome_bin", "-out.cell", ".bands", ".cell", ".param", ] for ext in files_to_cache: old_file = "{}{}_{}".format(seed, ext, suffix) current_file = "{}{}".format(seed, ext) if os.path.isfile(current_file): if suffix != "bak": backup_file = "{}{}_{}".format(seed, ext, "bak") shutil.copy2(current_file, backup_file) os.remove(current_file) if os.path.isfile(old_file): LOG.debug("Copying {} to {}".format(old_file, current_file)) shutil.copy2(old_file, current_file) if suffix == "bak": os.remove(old_file)