Source code for matador.compute.slurm

# coding: utf-8
# Distributed under the terms of the MIT License.

""" This file implements a simple interface to basic SLURM
functionality, including creating and submitting slurm scripts and
cancelling jobs.

"""

from matador.compute.queueing import QueueManager


[docs]class SlurmQueueManager(QueueManager): """ Wrapper for the Slurm queueing system. """ token = 'slurm'
[docs] def get_array_id(self): if self.env.get('SLURM_ARRAY_TASK_ID') is not None: return int(self.env['SLURM_ARRAY_TASK_ID']) return None
[docs] def get_ntasks(self): return int(self.env['SLURM_NTASKS'])
[docs] def get_max_memory(self): if self.env.get('SLURM_MEM_PER_CPU') is None: return None else: return float(self.env['SLURM_MEM_PER_CPU']) * self.ntasks
[docs] def get_walltime(self): """ Query available walltime with scontrol on the current job. Parameters: slurm_dict (dict): slurm env parameters to query. Raises: RuntimeError: if SLURM_JOB_ID not present in slurm env. subprocess.CalledProcessError: if unable to use scontrol. Returns: int: maximum allowed walltime time in seconds. """ import subprocess as sp slurm_dict = self.env job_id = slurm_dict.get('SLURM_JOB_ID') if job_id is None: return None output = sp.check_output('scontrol show job={}'.format(job_id), shell=True).decode('utf-8') output_dict = {line.split('=')[0].lower(): line.split('=')[-1] for line in output.split()} walltime = output_dict.get('timelimit') hrs = 0 if '-' in walltime: days = int(walltime.split('-')[0]) walltime = walltime.split('-')[1] hrs += days * 24 hrs += int(walltime.split(':')[0]) mins = int(walltime.split(':')[1]) secs = int(walltime.split(':')[2]) walltime_in_seconds = (60 * hrs + mins) * 60 + secs return walltime_in_seconds
[docs]def scancel_all_matching_jobs(name=None): """ Cancel all of the user's jobs. Keyword arguments: name (str): optional name to pass to scancel Returns: str: output from scancel. """ from os import getlogin import subprocess as sp user = getlogin() if name is None: return sp.check_output('scancel -u {}'.format(user), shell=True).decode('utf-8') return sp.check_output('scancel -u {} -n {}'.format(user, name), shell=True).decode('utf-8')
[docs]def submit_slurm_script(slurm_fname, depend_on_job=None, num_array_tasks=None): """ Submit a SLURM job. Parameters: slurm_fname (str): SLURM job file to submit. Keyword arguments: depend_on_job (int): job ID to make current job depend on. num_array_tasks (int): number of array tasks to submit. Raises: subprocess.CalledProcessError: if jobfile doesn't exist or has failed. Return: int: submitted SLURM job ID. """ import subprocess as sp command = 'sbatch ' if depend_on_job is not None: command += '--dependency=afterany:{} '.format(depend_on_job) if num_array_tasks is not None: assert num_array_tasks > 0 if num_array_tasks != 1: command += '--array=0-{} '.format(num_array_tasks-1) command += '{}'.format(slurm_fname) slurm_output = sp.check_output(command, shell=True).decode('utf-8') slurm_job_id = int(slurm_output.strip().split()[-1]) return slurm_job_id
[docs]def get_slurm_header(slurm_dict, walltime_hrs, num_nodes=None): """ Write a SLURM script header from a set of slurm parameters. Parameters: slurm_dict (dict): dictionary of SLURM environment variables. walltime_hrs (int): allowed walltime in hours Keyword arguments: num_nodes (int): overrides $SLURM_JOB_NUM_NODES with a custom value. Returns: header (str): the SLURM file header. """ header = "#!/bin/bash\n" header += "#! SLURM file written by matador (Matthew Evans 2016-2018).\n\n" header += "#! Name of job:\n" header += "#SBATCH --job-name {}\n".format(slurm_dict['SLURM_JOB_NAME']) header += "#! Name of project:\n" header += "#SBATCH --account {}\n".format(slurm_dict['SLURM_JOB_ACCOUNT']) if num_nodes is None: num_nodes = slurm_dict['SLURM_JOB_NUM_NODES'] header += "#! Number of nodes to allocate:\n" header += "#SBATCH --nodes {}\n".format(num_nodes) header += "#! Number of tasks to allocate:\n" header += "#SBATCH --ntasks {}\n".format(slurm_dict['SLURM_NTASKS']) header += "#! Partition:\n" header += "#SBATCH --partition {}\n".format(slurm_dict['SLURM_JOB_PARTITION']) header += "#! Walltime to allocate:\n" header += "#SBATCH --time {}:00:00\n".format(walltime_hrs) return header
[docs]def write_slurm_submission_script(slurm_fname, slurm_dict, compute_string, walltime_hrs, template=None): """ Write a full slurm submission script based on the input settings. Parameters: slurm_fname (str): the desired filename for the submission script slurm_dict (dict): dictionary of SLURM environment variables compute_string (str): the compute commands to run walltime_hrs (int): maximum walltime in hours Keyword arguments: template (str): filename containing job preamble, e.g. module loads """ header = get_slurm_header(slurm_dict, walltime_hrs) if template is not None: with open(template, 'r') as f: preamble = f.readlines() else: preamble = [] with open(slurm_fname, 'w') as f: f.write(header) f.write('\n\n') for line in preamble: f.write(line) f.write('\n\n') f.write(compute_string)