# coding: utf-8
# Distributed under the terms of the MIT License.
""" This module implements the CastepSpectralWorkflow, which performs
spectral calculations with CASTEP in multiple steps:
1. Performs a singlepoint calculation (if check file is not found).
2. If the ``spectral_kpoints_mp_spacing`` keyword is found, interpolate
wavefunction to DOS grid.
- If an OptaDOS input file (.odi) with the root seedname
is found, run OptaDOS on the resulting density of states.
3. If ``spectral_kpoints_path_spacing`` keyword is present, create
a bandstructure on the SeeK-path generated path.
"""
import os
import copy
import glob
import logging
from matador.workflows import Workflow
from matador.scrapers import arbitrary2dict
from matador.workflows.castep.common import castep_scf
LOG = logging.getLogger('run3')
[docs]def castep_full_spectral(computer, calc_doc, seed, **kwargs):
""" Perform a "full" spectral calculation on a system, i.e. first
perform an SCF then interpolate to different kpoint paths/grids to
form DOS and dispersions. Optionally use OptaDOS for post-processing
of DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): dictionary of structure and calculation
parameters.
seed (str): root seed for the calculation.
Raises:
RuntimeError: if any part of the calculation fails.
Returns:
bool: True if Workflow completed successfully, or False otherwise.
"""
workflow = CastepSpectralWorkflow(computer, calc_doc, seed)
return workflow.success
[docs]class CastepSpectralWorkflow(Workflow):
""" Perform a "full" spectral calculation on a system, i.e. first
perform an SCF then interpolate to different kpoint paths/grids to
form DOS and dispersions. Optionally use OptaDOS for post-processing
of DOS.
Attributes:
computer (:obj:`matador.compute.ComputeTask`): the object that calls CASTEP.
calc_doc (dict): the interim dictionary of structural and
calculation parameters.
seed (str): the root seed for the calculation.
success (bool): the status of the Workflow: only set to True after
post-processing method completes.
"""
[docs] def preprocess(self):
""" Decide which parts of the Workflow need to be performed,
and set the appropriate CASTEP parameters.
"""
# default todo
todo = {'scf': True, 'dos': False, 'pdos': False, 'broadening': False, 'dispersion': False, 'pdis': False}
# definition of steps and names
steps = {'scf': castep_spectral_scf,
'dos': castep_spectral_dos,
'pdos': optados_pdos,
'broadening': optados_dos_broadening,
'dispersion': castep_spectral_dispersion,
'pdis': optados_pdispersion}
exts = {
'scf': {
'input': ['.cell', '.param'],
'output': ['.castep', '.*err', '-out.cell']
},
'dos': {
'input': ['.cell', '.param'],
'output': ['.castep', '.bands', '.pdos_bin', '.dome_bin', '.*err', '-out.cell']
},
'dispersion': {
'input': ['.cell', '.param'],
'output': ['.castep', '.bands', '.pdos_bin', '.dome_bin', '.*err', '-out.cell']
},
'pdis': {
'input': ['.odi', '.pdos_bin'],
'output': ['.odo', '.*err']
},
'pdos': {
'input': ['.odi', '.pdos_bin', '.dome_bin'],
'output': ['.odo', '.*err']
},
'broadening': {
'input': ['.odi', '.pdos_bin', '.dome_bin'],
'output': ['.odo', '.*err']
}
}
if os.path.isfile(self.seed + '.check'):
LOG.info('Found {}.check, so skipping initial SCF.'.format(self.seed))
todo['scf'] = False
if (
(
'spectral_kpoints_path' in self.calc_doc or
'spectral_kpoints_list' in self.calc_doc or
'spectral_kpoints_path_spacing' in self.calc_doc or
self.calc_doc.get('spectral_task', '').lower() == 'bandstructure'
)
):
todo['dispersion'] = not os.path.isfile(self.seed + '.bands_dispersion')
if ('spectral_kpoints_mp_spacing' in self.calc_doc or
self.calc_doc.get('spectral_task', '').lower() == 'dos'):
todo['dos'] = not os.path.isfile(self.seed + '.bands_dos')
odi_fname = _get_optados_fname(self.seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
if todo['dispersion']:
todo['pdis'] = 'pdispersion' in odi_dict
if todo['dos']:
todo['broadening'] = 'broadening' in odi_dict
todo['pdos'] = 'pdos' in odi_dict
for key in todo:
if todo[key]:
self.add_step(steps[key], key,
input_exts=exts[key].get('input'),
output_exts=exts[key].get('output'))
# if not using a user-requested path, use seekpath and spglib
# to reduce to primitive and use consistent path
if 'spectral_kpoints_list' not in self.calc_doc and 'spectral_kpoints_path' not in self.calc_doc:
from matador.utils.cell_utils import cart2abc
prim_doc, kpt_path = self.computer.get_seekpath_compliant_input(
self.calc_doc, self.calc_doc.get('spectral_kpoints_path_spacing', 0.05))
self.calc_doc.update(prim_doc)
self.calc_doc['lattice_abc'] = cart2abc(self.calc_doc['lattice_cart'])
if todo['dispersion']:
self.calc_doc['spectral_kpoints_list'] = kpt_path
elif todo['dispersion'] and 'spectral_kpoints_path' in self.calc_doc:
self._user_defined_kpt_path = True
LOG.warning('Using user-defined k-point path for all structures.')
self.calc_doc['spectral_kpoints_path_spacing'] = self.calc_doc.get('spectral_kpoints_path_spacing', 0.05)
if todo['dos']:
self.calc_doc['spectral_kpoints_mp_spacing'] = self.calc_doc.get('spectral_kpoints_mp_spacing', 0.05)
# always use continuation
self.calc_doc['continuation'] = 'default'
[docs]def castep_spectral_scf(computer, calc_doc, seed):
""" Run a singleshot SCF calculation.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info('Performing CASTEP spectral SCF...')
scf_doc = copy.deepcopy(calc_doc)
scf_doc['write_checkpoint'] = 'ALL'
scf_doc['task'] = 'singlepoint'
scf_doc['write_cell_structure'] = True
if 'spectral_task' in scf_doc:
del scf_doc['spectral_task']
required = []
forbidden = ['spectral_task',
'spectral_kpoints_list',
'spectral_kpoints_path',
'spectral_kpoints_mp_spacing',
'spectral_kpoints_path_spacing']
return castep_scf(computer, scf_doc, seed, required_keys=required, forbidden_keys=forbidden)
[docs]def castep_spectral_dos(computer, calc_doc, seed):
""" Runs a DOS interpolation on top of a completed SCF. If a single
.odi file is found, run OptaDOS on the resulting DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info('Performing CASTEP spectral DOS calculation...')
dos_doc = copy.deepcopy(calc_doc)
dos_doc['task'] = 'spectral'
dos_doc['spectral_task'] = 'dos'
# disable checkpointing for BS/DOS by default, leaving just SCF
dos_doc['write_checkpoint'] = 'none'
dos_doc['write_cell_structure'] = True
dos_doc['pdos_calculate_weights'] = True
required = ['spectral_kpoints_mp_spacing']
forbidden = ['spectral_kpoints_list',
'spectral_kpoints_path',
'spectral_kpoints_path_spacing']
computer.validate_calc_doc(dos_doc, required, forbidden)
success = computer.run_castep_singleshot(dos_doc, seed, keep=True, intermediate=True)
return success
[docs]def castep_spectral_dispersion(computer, calc_doc, seed):
""" Runs a dispersion interpolation on top of a completed SCF calculation,
optionally running orbitals2bands and OptaDOS projected dispersion.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling CASTEP.
calc_doc (dict): the structure to run on.
seed (str): root filename of structure.
"""
LOG.info('Performing CASTEP spectral dispersion calculation...')
disp_doc = copy.deepcopy(calc_doc)
disp_doc['task'] = 'spectral'
disp_doc['spectral_task'] = 'bandstructure'
# disable checkpointing for BS/DOS by default, leaving just SCF
disp_doc['write_checkpoint'] = 'none'
disp_doc['pdos_calculate_weights'] = True
disp_doc['write_cell_structure'] = True
disp_doc['continuation'] = 'default'
required = []
forbidden = ['spectral_kpoints_mp_spacing']
computer.validate_calc_doc(disp_doc, required, forbidden)
success = computer.run_castep_singleshot(disp_doc, seed, keep=True, intermediate=True)
if disp_doc.get('write_orbitals'):
LOG.info('Planning to call orbitals2bands...')
_cache_executable = copy.deepcopy(computer.executable)
_cache_core = copy.deepcopy(computer.ncores)
computer.ncores = 1
computer.executable = 'orbitals2bands'
try:
success = computer.run_generic(intermediate=True, mv_bad_on_failure=False)
except Exception as exc:
computer.executable = _cache_executable
computer.ncores = _cache_core
LOG.warning('Failed to call orbitals2bands, with error: {}'.format(exc))
computer.ncores = _cache_core
computer.executable = _cache_executable
return success
[docs]def optados_pdos(computer, _, seed):
""" Run an OptaDOS projected-DOS.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
odi_dict['task'] = 'pdos'
if 'pdispersion' in odi_dict:
del odi_dict['pdispersion']
LOG.info('Performing OptaDOS pDOS calculation with parameters from {}'.format(odi_fname))
success = _run_optados(computer, odi_dict, seed, suffix='dos')
return success
return None
[docs]def optados_dos_broadening(computer, _, seed):
""" Run an OptaDOS total DOS broadening.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
# if broadening keyword is present, try to run a normal DOS
odi_dict['task'] = 'dos'
if 'pdos' in odi_dict:
del odi_dict['pdos']
if 'pdispersion' in odi_dict:
del odi_dict['pdispersion']
LOG.info('Performing OptaDOS DOS broadening with parameters from {}'.format(odi_fname))
return _run_optados(computer, odi_dict, seed, suffix='dos')
return None
[docs]def optados_pdispersion(computer, _, seed):
""" Runs an OptaDOS projected dispersion calculation.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
_ : second parameter is required but ignored.
seed (str): root filename of structure.
"""
odi_fname = _get_optados_fname(seed)
if odi_fname is not None:
odi_dict, _ = arbitrary2dict(odi_fname)
# if pdispersion keyword is present, try to run a pdis
odi_dict['task'] = 'pdispersion'
if 'pdos' in odi_dict:
del odi_dict['pdos']
LOG.info('Performing OptaDOS pDIS calculation with parameters from {}'.format(odi_fname))
return _run_optados(computer, odi_dict, seed, suffix='dispersion')
return None
def _run_optados(computer, odi_dict, seed, suffix=None):
""" Run OptaDOS with given computer object, parameters and seed, adjusting
the number of cores and the executable to call, then restoring them after.
Parameters:
computer (:obj:`matador.compute.ComputeTask`): the object that will be calling OptaDOS.
odi_dict (dict): the OptaDOS parameters to write to file.
seed (str): root filename of structure.
Keyword arguments:
suffix (str): either 'dos' or 'dispersion' for backup/restore.
executable (str): optados executable to call.
"""
from matador.export import doc2arbitrary
odi_path = '{}.odi'.format(seed)
if computer.compute_dir is not None:
odi_path = computer.compute_dir + '/' + odi_path
doc2arbitrary(odi_dict, odi_path, overwrite=True)
if suffix is not None:
_get_correct_files_for_optados(seed, suffix=suffix)
_cache_executable = copy.deepcopy(computer.executable)
_cache_core = copy.deepcopy(computer.ncores)
_cache_nodes = copy.deepcopy(computer.nnodes)
computer.ncores = 1
computer.nnodes = 1
computer.executable = computer.optados_executable
success = False
try:
success = computer.run_generic(intermediate=True, mv_bad_on_failure=False)
except Exception as exc:
LOG.warning('Failed to call optados with error: {}'.format(exc))
computer.ncores = _cache_core
computer.nnodes = _cache_nodes
computer.executable = _cache_executable
if suffix is not None:
_get_correct_files_for_optados(seed, suffix='bak')
return success
def _get_optados_fname(seed):
""" Get the most likely OptaDOS input file name, which here means either
only existing one, or the shortest one.
Parameters:
seed (str): seedname for the calculation.
Returns:
str: the OptaDOS input filename.
"""
odi_fname = None
if os.path.isfile(seed + '.odi'):
os.remove(seed + '.odi')
if glob.glob('*.odi'):
# dodginess: choose the odi file with the shortest name...
shortest_fname = None
for fname in glob.glob('*.odi'):
if shortest_fname is None or len(fname) < len(shortest_fname):
shortest_fname = fname
odi_fname = shortest_fname
return odi_fname
def _get_correct_files_for_optados(seed, suffix=None):
""" If e.g. dispersion and DOS calculations were run previously, but
it is unclear which exists as the current <seed>.<ext>, try to copy
the old <seed>.<ext>_dispersion/dos files to the correct place.
Parameters:
seed (str): seedname for the calculation.
Keyword arguments:
suffix (str): either 'dos' or 'dispersion', or 'bak' to get old files back.
"""
import shutil
LOG.debug('Getting files for OptaDOS: {} {}'.format(seed, suffix))
if suffix is None:
return
files_to_cache = ['.pdos_bin', '.dome_bin', '-out.cell', '.bands', '.cell', '.param']
for ext in files_to_cache:
old_file = '{}{}_{}'.format(seed, ext, suffix)
current_file = '{}{}'.format(seed, ext)
if os.path.isfile(current_file):
if suffix != 'bak':
backup_file = '{}{}_{}'.format(seed, ext, 'bak')
shutil.copy2(current_file, backup_file)
os.remove(current_file)
if os.path.isfile(old_file):
LOG.debug('Copying {} to {}'.format(old_file, current_file))
shutil.copy2(old_file, current_file)
if suffix == 'bak':
os.remove(old_file)