# coding: utf-8
# Distributed under the terms of the MIT license.
""" This file implements the BatchRun class for chaining
ComputeTask instances across several structures with
high-throughput.
"""
from collections import defaultdict
import multiprocessing as mp
import os
import glob
import time
import random
import psutil
from matador.utils.print_utils import print_failure, print_warning
from matador.compute.queueing import get_queue_manager
from matador.scrapers.castep_scrapers import cell2dict, param2dict
from matador.compute.compute import ComputeTask
from matador.utils.errors import (
InputError,
CalculationError,
MaxMemoryEstimateExceeded,
NodeCollisionError,
)
[docs]class BatchRun:
"""A class that implements the running of multiple generic jobs on
a series of files without collisions with other nodes using the
ComputeTask class. Jobs that have been started are listed in
``jobs.txt``, failed jobs are moved to ``bad_castep/``, completed jobs
are moved to ``completed/``.
Interface initially inspired by on run.pl, run2.pl and PyAIRSS class
CastepRunner.
"""
def __init__(self, seed, **kwargs):
"""Check directory has valid contents and prepare log files
and directories if not already prepared, then begin running
calculations.
Note:
This class is usually initialised by the run3 script, which
has a full description of possible arguments.
Parameters:
seed (:obj:`list` of :obj:`str`): single entry of param/cell
file seed for CASTEP geometry optimisations of res
files, or a list of filenames of ``$seed`` to run arbitrary
executables on. e.g. ``['LiAs']`` if LiAs.cell and LiAs.param
exist in cwd full of res files, e.g.2. ``['LiAs_1', 'LiAs_2']``
if LiAs_1.in/LiAs_2.in exist, and executable = 'pw6.x < $seed.in'.
Keyword arguments:
Exhaustive list found in argparse parser inside `matador/cli/run3.py`.
"""
# parse args, then co-opt them for passing directly into ComputeTask
prop_defaults = {
"ncores": None,
"nprocesses": 1,
"nnodes": 1,
"executable": "castep",
"no_reopt": False,
"mode": None,
"redirect": None,
"debug": False,
"custom_params": False,
"verbosity": 0,
"archer": False,
"slurm": False,
"intel": False,
"conv_cutoff": False,
"conv_kpt": False,
"memcheck": False,
"maxmem": None,
"killcheck": True,
"scratch_prefix": None,
"kpts_1D": False,
"spin": None,
"ignore_jobs_file": False,
"rough": 4,
"rough_iter": 2,
"fine_iter": 20,
"max_walltime": None,
"limit": None,
"profile": False,
"polltime": 30,
}
self.args = {}
self.args.update(prop_defaults)
self.args.update(kwargs)
self.debug = self.args.get("debug")
self.seed = seed
# if only one seed, check if it is a file, and if so treat
# this run as a generic run, not a CASTEP cell/param run
if len(self.seed) == 1 and isinstance(self.seed, list):
if "*" in self.seed[0]:
self.seed = glob.glob(self.seed[0])
elif not os.path.isfile(self.seed[0]):
self.seed = self.seed[0]
self.compute_dir = os.uname()[1]
if self.args.get("scratch_prefix") not in [None, "."]:
self.compute_dir = "{}/{}".format(
self.args["scratch_prefix"], self.compute_dir
).replace("//", "/")
elif self.args.get("scratch_prefix") == ".":
self.compute_dir = None
if self.args.get("mode") is not None:
self.mode = self.args.get("mode")
else:
if isinstance(self.seed, str):
self.mode = "castep"
else:
self.mode = "generic"
del self.args["mode"]
if self.args.get("no_reopt"):
self.args["reopt"] = False
else:
self.args["reopt"] = True
if "no_reopt" in self.args:
del self.args["no_reopt"]
self.nprocesses = int(self.args["nprocesses"])
del self.args["nprocesses"]
self.limit = self.args.get("limit")
del self.args["limit"]
self.maxmem = self.args.get("maxmem")
del self.args["maxmem"]
self.max_walltime = self.args.get("max_walltime")
# detect and scrape queue settings
self.queue_mgr = get_queue_manager()
if self.queue_mgr is not None:
if self.maxmem is None:
self.maxmem = self.queue_mgr.max_memory
if self.max_walltime is None:
self.max_walltime = self.queue_mgr.walltime
self.start_time = None
if self.max_walltime is not None:
self.start_time = time.time()
# assign number of cores
self.all_cores = psutil.cpu_count(logical=False)
if self.args.get("ncores") is None:
if self.queue_mgr is None:
self.args["ncores"] = int(self.all_cores / self.nprocesses)
else:
self.args["ncores"] = int(self.queue_mgr.ntasks / self.nprocesses)
if self.args["nnodes"] < 1 or self.args["ncores"] < 1 or self.nprocesses < 1:
raise InputError("Invalid number of cores, nodes or processes.")
if self.all_cores < self.nprocesses:
raise InputError(
"Requesting more processes than available cores: {} vs {}".format(
self.all_cores, self.nprocesses
)
)
# scrape input cell/param/other files
if self.mode == "castep":
self.castep_setup()
else:
self.generic_setup()
# prepare folders and text files
self.paths = dict()
if self.args.get("conv_cutoff"):
self.paths["completed_dir"] = "completed_cutoff"
elif self.args.get("conv_kpt"):
self.paths["completed_dir"] = "completed_kpts"
else:
self.paths["completed_dir"] = "completed"
self.paths["failed_dir"] = "bad_castep"
self.paths["jobs_fname"] = "jobs.txt"
self.paths["completed_fname"] = "finished_cleanly.txt"
self.paths["failures_fname"] = "failures.txt"
self.paths["memory_fname"] = "memory_exceeded.txt"
if not os.path.isfile(self.paths["jobs_fname"]):
with open(self.paths["jobs_fname"], "a"):
pass
if not os.path.isfile(self.paths["completed_fname"]):
with open(self.paths["completed_fname"], "a"):
pass
if not os.path.isfile(self.paths["failures_fname"]):
with open(self.paths["failures_fname"], "a"):
pass
if self.args.get("memcheck"):
if not os.path.isfile(self.paths["memory_fname"]):
with open(self.paths["memory_fname"], "a"):
pass
[docs] def spawn(self, join=False):
"""Spawn processes to perform calculations.
Keyword arguments:
join (bool): whether or not to attach to ComputeTask
process. Useful for testing.
"""
procs = []
error_queue = mp.Queue()
for proc_id in range(self.nprocesses):
procs.append(
mp.Process(
target=self.perform_new_calculations,
args=(
random.sample(
self.file_lists["res"], len(self.file_lists["res"])
),
error_queue,
proc_id,
),
)
)
for proc in procs:
proc.start()
if join:
proc.join()
errors = []
failed_seeds = []
# wait for each proc to write to error queue
try:
for _, proc in enumerate(procs):
result = error_queue.get()
if isinstance(result[1], Exception):
errors.append(result)
failed_seeds.append(result[2])
if errors:
error_message = ""
for error in errors:
error_message += "Process {} raised error(s): {}. ".format(
error[0], error[1]
)
if len({type(error[1]) for error in errors}) == 1:
raise errors[0][1]
raise type(errors[0][1])(error_message)
raise BundledErrors(error_message)
# the only errors that reach here are fatal, e.g. WalltimeError, CriticalError, InputError, KeyboardInterrupt
except RuntimeError as err:
result = [proc.join(timeout=2) for proc in procs]
result = [proc.terminate() for proc in procs if proc.is_alive()]
try:
# Guard against failures to write to stdout
print_failure("Fatal error(s) reported:")
print_warning(err)
except OSError:
pass
raise err
try:
# Guard against failures to write to stdout
print("Nothing left to do.")
except OSError:
pass
[docs] def generic_setup(self):
"""Undo things that are set ready for CASTEP jobs..."""
self.cell_dict = None
self.param_dict = None
# scan directory for files to run
self.file_lists = defaultdict(list)
self.file_lists["res"] = glob.glob("*.res")
[docs] def castep_setup(self):
"""Set up CASTEP jobs from res files, and $seed.cell/param."""
# read cell/param files
exts = ["cell", "param"]
for ext in exts:
if not os.path.isfile("{}.{}".format(self.seed, ext)):
raise InputError(
"Failed to find {ext} file, {seed}.{ext}".format(
ext=ext, seed=self.seed
)
)
self.cell_dict, cell_success = cell2dict(
self.seed + ".cell", db=False, lattice=False, positions=True
)
if not cell_success:
raise InputError(f"Failed to parse cell file: {self.cell_dict}")
self.param_dict, param_success = param2dict(self.seed + ".param", db=False)
if not param_success:
raise InputError(f"Failed to parse param file: {self.param_dict}")
# scan directory for files to run
self.file_lists = defaultdict(list)
self.file_lists["res"] = glob.glob("*.res")
if any(
self.seed == file.replace(".res", "") for file in self.file_lists["res"]
):
error = "Found .res file with same name as seed: {}.res. This will wreak havoc on your calculations!\n".format(
self.seed
) + "Please rename either your seed.cell/seed.param files, or rename the offending {}.res".format(
self.seed
)
raise InputError(error)
if not self.file_lists["res"]:
error = "run3 in CASTEP mode requires at least 1 res file in folder, found {}".format(
len(self.file_lists["res"])
)
raise InputError(error)
if len(self.file_lists["res"]) < self.nprocesses and not any(
[self.args.get("conv_cutoff"), self.args.get("conv_kpt")]
):
raise InputError("Requested more processes than there are jobs to run!")
# do some prelim checks of parameters
if self.param_dict["task"].upper() in [
"GEOMETRYOPTIMISATION",
"GEOMETRYOPTIMIZATION",
]:
if "geom_max_iter" not in self.param_dict:
raise InputError("geom_max_iter is unset, please fix this.")
if int(self.param_dict["geom_max_iter"]) <= 0:
raise InputError(
"geom_max_iter is only {}!".format(self.param_dict["geom_max_iter"])
)
# parse convergence args and set them up
self.convergence_run_setup()
# delete source from cell and param
del self.cell_dict["source"]
del self.param_dict["source"]
[docs] def convergence_run_setup(self):
"""Set the correct args for a convergence run."""
# check if we're doing a conv run
if self.args.get("conv_cutoff"):
if os.path.isfile("cutoff.conv"):
with open("cutoff.conv", "r") as f:
flines = f.readlines()
self.args["conv_cutoff"] = []
for line in flines:
if not line.startswith("#"):
self.args["conv_cutoff"].append(int(line))
else:
raise InputError("Missing cutoff.conv file")
else:
self.args["conv_cutoff"] = None
if self.args.get("conv_kpt"):
if os.path.isfile("kpt.conv"):
with open("kpt.conv", "r") as f:
flines = f.readlines()
self.args["conv_kpt"] = []
for line in flines:
if not line.startswith("#"):
self.args["conv_kpt"].append(float(line))
else:
raise InputError("Missing with conv.kpt file")
else:
self.args["conv_kpt"] = None
def _check_jobs_file(self, res):
"""Check if structure is listed in jobs.txt file.
Parameters:
res (str): structure name.
Returns:
bool: True if already listed in jobs file.
"""
with open(self.paths["jobs_fname"], "r") as job_file:
flines = job_file.readlines()
for line in flines:
if res in line:
return True
return False
[docs]class BundledErrors(Exception):
"""Raise this after collecting all exceptions from
processes.
"""
[docs]def reset_job_folder(debug=False):
"""Remove all lock files and clean up jobs.txt
ready for job restart.
Note:
This should be not called by a ComputeTask instance, in case
other instances are running.
Returns:
num_remaining (int): number of structures left to relax
"""
res_list = glob.glob("*.res")
for f in res_list:
root = f.replace(".res", "")
exts_to_rm = ["res.lock", "kill"]
for ext in exts_to_rm:
if os.path.isfile("{}.{}".format(root, ext)):
os.remove("{}.{}".format(root, ext))
# also remove from jobs file
if os.path.isfile("jobs.txt"):
with open("jobs.txt", "r+") as f:
flines = f.readlines()
f.seek(0)
for line in flines:
line = line.strip()
if line in res_list:
continue
f.write(line)
f.truncate()
flines = f.readlines()
return len(res_list)
[docs]def reset_single_seed(seed):
"""Remove the file lock and jobs.txt entry
for a single seed.
Parameters:
seed (str): the seedname to remove.
"""
if os.path.isfile("jobs.txt"):
with open("jobs.txt", "r+") as f:
flines = f.readlines()
f.seek(0)
for line in flines:
line = line.strip()
if seed in line:
continue
f.write(line)
f.truncate()
flines = f.readlines()
if os.path.isfile(seed + ".lock"):
os.remove(seed + ".lock")