# coding: utf-8
# Distributed under the terms of the MIT License.
""" This file implements a simple interface to basic SLURM
functionality, including creating and submitting slurm scripts and
cancelling jobs.
"""
from matador.compute.queueing import QueueManager
[docs]class SlurmQueueManager(QueueManager):
""" Wrapper for the Slurm queueing system. """
token = 'slurm'
[docs] def get_array_id(self):
if self.env.get('SLURM_ARRAY_TASK_ID') is not None:
return int(self.env['SLURM_ARRAY_TASK_ID'])
return None
[docs] def get_ntasks(self):
return int(self.env['SLURM_NTASKS'])
[docs] def get_max_memory(self):
if self.env.get('SLURM_MEM_PER_CPU') is None:
return None
else:
return float(self.env['SLURM_MEM_PER_CPU']) * self.ntasks
[docs] def get_walltime(self):
""" Query available walltime with scontrol on the current job.
Parameters:
slurm_dict (dict): slurm env parameters to query.
Raises:
RuntimeError: if SLURM_JOB_ID not present in slurm env.
subprocess.CalledProcessError: if unable to use scontrol.
Returns:
int: maximum allowed walltime time in seconds.
"""
import subprocess as sp
slurm_dict = self.env
job_id = slurm_dict.get('SLURM_JOB_ID')
if job_id is None:
return None
output = sp.check_output('scontrol show job={}'.format(job_id), shell=True).decode('utf-8')
output_dict = {line.split('=')[0].lower(): line.split('=')[-1] for line in output.split()}
walltime = output_dict.get('timelimit')
hrs = 0
if '-' in walltime:
days = int(walltime.split('-')[0])
walltime = walltime.split('-')[1]
hrs += days * 24
hrs += int(walltime.split(':')[0])
mins = int(walltime.split(':')[1])
secs = int(walltime.split(':')[2])
walltime_in_seconds = (60 * hrs + mins) * 60 + secs
return walltime_in_seconds
[docs]def scancel_all_matching_jobs(name=None):
""" Cancel all of the user's jobs.
Keyword arguments:
name (str): optional name to pass to scancel
Returns:
str: output from scancel.
"""
from os import getlogin
import subprocess as sp
user = getlogin()
if name is None:
return sp.check_output('scancel -u {}'.format(user), shell=True).decode('utf-8')
return sp.check_output('scancel -u {} -n {}'.format(user, name), shell=True).decode('utf-8')
[docs]def submit_slurm_script(slurm_fname, depend_on_job=None, num_array_tasks=None):
""" Submit a SLURM job.
Parameters:
slurm_fname (str): SLURM job file to submit.
Keyword arguments:
depend_on_job (int): job ID to make current job depend on.
num_array_tasks (int): number of array tasks to submit.
Raises:
subprocess.CalledProcessError: if jobfile doesn't exist or has failed.
Return:
int: submitted SLURM job ID.
"""
import subprocess as sp
command = 'sbatch '
if depend_on_job is not None:
command += '--dependency=afterany:{} '.format(depend_on_job)
if num_array_tasks is not None:
assert num_array_tasks > 0
if num_array_tasks != 1:
command += '--array=0-{} '.format(num_array_tasks-1)
command += '{}'.format(slurm_fname)
slurm_output = sp.check_output(command, shell=True).decode('utf-8')
slurm_job_id = int(slurm_output.strip().split()[-1])
return slurm_job_id
[docs]def write_slurm_submission_script(slurm_fname, slurm_dict, compute_string, walltime_hrs, template=None):
""" Write a full slurm submission script based on the
input settings.
Parameters:
slurm_fname (str): the desired filename for the submission script
slurm_dict (dict): dictionary of SLURM environment variables
compute_string (str): the compute commands to run
walltime_hrs (int): maximum walltime in hours
Keyword arguments:
template (str): filename containing job preamble, e.g. module loads
"""
header = get_slurm_header(slurm_dict, walltime_hrs)
if template is not None:
with open(template, 'r') as f:
preamble = f.readlines()
else:
preamble = []
with open(slurm_fname, 'w') as f:
f.write(header)
f.write('\n\n')
for line in preamble:
f.write(line)
f.write('\n\n')
f.write(compute_string)