# coding: utf-8
# Distributed under the terms of the MIT License.
""" This file implements a simple interface to basic SLURM
functionality, including creating and submitting slurm scripts and
cancelling jobs.
"""
from matador.compute.queueing import QueueManager
[docs]class SlurmQueueManager(QueueManager):
"""Wrapper for the Slurm queueing system."""
token = "slurm"
[docs] def get_array_id(self):
if self.env.get("SLURM_ARRAY_TASK_ID") is not None:
return int(self.env["SLURM_ARRAY_TASK_ID"])
return None
[docs] def get_ntasks(self):
return int(self.env["SLURM_NTASKS"])
[docs] def get_max_memory(self):
if self.env.get("SLURM_MEM_PER_CPU") is None:
return None
else:
return float(self.env["SLURM_MEM_PER_CPU"]) * self.ntasks
[docs] def get_walltime(self):
"""Query available walltime with scontrol on the current job.
Parameters:
slurm_dict (dict): slurm env parameters to query.
Raises:
RuntimeError: if SLURM_JOB_ID not present in slurm env.
subprocess.CalledProcessError: if unable to use scontrol.
Returns:
int: maximum allowed walltime time in seconds.
"""
import subprocess as sp
slurm_dict = self.env
job_id = slurm_dict.get("SLURM_JOB_ID")
if job_id is None:
return None
output = sp.check_output(
"scontrol show job={}".format(job_id), shell=True
).decode("utf-8")
output_dict = {
line.split("=")[0].lower(): line.split("=")[-1] for line in output.split()
}
walltime = output_dict.get("timelimit")
hrs = 0
if "-" in walltime:
days = int(walltime.split("-")[0])
walltime = walltime.split("-")[1]
hrs += days * 24
hrs += int(walltime.split(":")[0])
mins = int(walltime.split(":")[1])
secs = int(walltime.split(":")[2])
walltime_in_seconds = (60 * hrs + mins) * 60 + secs
return walltime_in_seconds
[docs]def scancel_all_matching_jobs(name=None):
"""Cancel all of the user's jobs.
Keyword arguments:
name (str): optional name to pass to scancel
Returns:
str: output from scancel.
"""
from os import getlogin
import subprocess as sp
user = getlogin()
if name is None:
return sp.check_output("scancel -u {}".format(user), shell=True).decode("utf-8")
return sp.check_output("scancel -u {} -n {}".format(user, name), shell=True).decode(
"utf-8"
)
[docs]def submit_slurm_script(slurm_fname, depend_on_job=None, num_array_tasks=None):
"""Submit a SLURM job.
Parameters:
slurm_fname (str): SLURM job file to submit.
Keyword arguments:
depend_on_job (int): job ID to make current job depend on.
num_array_tasks (int): number of array tasks to submit.
Raises:
subprocess.CalledProcessError: if jobfile doesn't exist or has failed.
Return:
int: submitted SLURM job ID.
"""
import subprocess as sp
command = "sbatch "
if depend_on_job is not None:
command += "--dependency=afterany:{} ".format(depend_on_job)
if num_array_tasks is not None:
assert num_array_tasks > 0
if num_array_tasks != 1:
command += "--array=0-{} ".format(num_array_tasks - 1)
command += "{}".format(slurm_fname)
slurm_output = sp.check_output(command, shell=True).decode("utf-8")
slurm_job_id = int(slurm_output.strip().split()[-1])
return slurm_job_id
[docs]def write_slurm_submission_script(
slurm_fname, slurm_dict, compute_string, walltime_hrs, template=None
):
"""Write a full slurm submission script based on the
input settings.
Parameters:
slurm_fname (str): the desired filename for the submission script
slurm_dict (dict): dictionary of SLURM environment variables
compute_string (str): the compute commands to run
walltime_hrs (int): maximum walltime in hours
Keyword arguments:
template (str): filename containing job preamble, e.g. module loads
"""
header = get_slurm_header(slurm_dict, walltime_hrs)
if template is not None:
with open(template, "r") as f:
preamble = f.readlines()
else:
preamble = []
with open(slurm_fname, "w") as f:
f.write(header)
f.write("\n\n")
for line in preamble:
f.write(line)
f.write("\n\n")
f.write(compute_string)