Source code for cmd_queue.slurm_queue

r"""
Work in progress. The idea is to provide a TMUX queue and a SLURM queue that
provide a common high level API, even though functionality might diverge, the
core functionality of running processes asynchronously should be provided.

Notes:
    # Installing and configuring SLURM
    See git@github.com:Erotemic/local.git init/setup_slurm.sh
    Or ~/local/init/setup_slurm.sh in my local checkout

    SUBMIT COMMANDS WILL USE /bin/sh by default, not sure how to fix that
    properly. There are workarounds though.


CommandLine:
   xdoctest -m cmd_queue.slurm_queue __doc__

Example:
    >>> from cmd_queue.slurm_queue import *  # NOQA
    >>> dpath = ub.Path.appdir('slurm_queue/tests')
    >>> queue = SlurmQueue()
    >>> job0 = queue.submit(f'echo "here we go"', name='root job')
    >>> job1 = queue.submit(f'mkdir -p {dpath}', depends=[job0])
    >>> job2 = queue.submit(f'echo "result=42" > {dpath}/test.txt ', depends=[job1])
    >>> job3 = queue.submit(f'cat {dpath}/test.txt', depends=[job2])
    >>> queue.print_commands()
    >>> # xdoctest: +REQUIRES(--run)
    >>> queue.run()
    >>> # Can read the output of jobs after they are done.
    >>> for job in queue.jobs:
    >>>     print('-----------------')
    >>>     print(f'job.name={job.name}')
    >>>     if job.output_fpath.exists():
    >>>         print(job.output_fpath.read_text())
    >>>     else:
    >>>         print('output does not exist')
"""
import ubelt as ub

from cmd_queue import base_queue  # NOQA
from cmd_queue.util import util_tags


[docs]def _coerce_mem(mem): """ Args: mem (int | str): integer number of megabytes or a parseable string Example: >>> from cmd_queue.slurm_queue import * # NOQA >>> print(_coerce_mem(30602)) >>> print(_coerce_mem('4GB')) >>> print(_coerce_mem('32GB')) >>> print(_coerce_mem('300000000 bytes')) """ if isinstance(mem, int): assert mem > 0 elif isinstance(mem, str): import pint reg = pint.UnitRegistry() mem = reg.parse_expression(mem) mem = int(mem.to('megabytes').m) else: raise TypeError(type(mem)) return mem
# List of extra keys that can be specified as key/value pairs in sbatch args # These are acceptable kwargs for SlurmQueue.__init__ and SlurmQueue.submit __dev__ = r""" # Script to build the modifier list import ubelt as ub import re b = xdev.regex_builder.RegexBuilder.coerce('python') blocklist = {'job_name', 'output', 'dependency', 'begin'} keyval_pat = re.compile(r'--([\w-]+)=') text = ub.cmd('sbatch --help')['out'] lines = ub.oset() for key in keyval_pat.findall(text): lines.append(key.replace('-', '_')) print(ub.urepr(list(lines - blocklist))) blocklist = {'mem', 'version', 'help', 'usage'} flag_pat = re.compile(r'--([\w-]+) ') lines = ub.oset() for key in flag_pat.findall(text): lines.append(key.replace('-', '_')) print(ub.urepr(list(lines - blocklist))) """ SLURM_SBATCH_KVARGS = [ 'array', 'account', 'bb', 'bbf', # 'begin', 'comment', 'cpu_freq', 'cpus_per_task', # 'dependency', 'deadline', 'delay_boot', 'chdir', 'error', 'export_file', 'gid', 'gres', 'gres_flags', 'input', # 'job_name', 'licenses', 'clusters', 'distribution', 'mail_type', 'mail_user', 'mcs_label', 'ntasks', 'ntasks_per_node', 'nodes', # 'output', 'partition', 'power', 'priority', 'profile', 'qos', 'core_spec', 'signal', 'switches', 'thread_spec', 'time', 'time_min', 'uid', 'wckey', 'cluster_constraint', 'constraint', 'nodefile', 'mem', 'mincpus', 'reservation', 'tmp', 'nodelist', 'exclude', 'mem_per_cpu', 'sockets_per_node', 'cores_per_socket', 'threads_per_core', 'extra_node_info', 'ntasks_per_core', 'ntasks_per_socket', 'hint', 'mem_bind', 'cpus_per_gpu', 'gpus', 'gpu_bind', 'gpu_freq', 'gpus_per_node', 'gpus_per_socket', 'gpus_per_task', 'mem_per_gpu', ] SLURM_SBATCH_FLAGS = [ 'get_user_env', 'hold', 'ignore_pbs', 'no_kill', 'container', 'no_requeue', 'overcommit', 'parsable', 'quiet', 'reboot', 'requeue', 'oversubscribe', 'spread_job', 'use_min_nodes', 'verbose', 'wait', 'contiguous', 'mem_per_cpu', ]
[docs]class SlurmJob(base_queue.Job): """ Represents a slurm job that hasn't been submitted yet Example: >>> from cmd_queue.slurm_queue import * # NOQA >>> self = SlurmJob('python -c print("hello world")', 'hi', cpus=5, gpus=1, mem='10GB') >>> command = self._build_sbatch_args() >>> print('command = {!r}'.format(command)) >>> self = SlurmJob('python -c print("hello world")', 'hi', cpus=5, gpus=1, mem='10GB', depends=[self]) >>> command = self._build_command() >>> print(command) """ def __init__(self, command, name=None, output_fpath=None, depends=None, cpus=None, gpus=None, mem=None, begin=None, shell=None, tags=None, **kwargs): super().__init__() if name is None: import uuid name = 'job-' + str(uuid.uuid4()) if depends is not None and not ub.iterable(depends): depends = [depends] self.unused_kwargs = kwargs self.command = command self.name = name self.output_fpath = output_fpath self.depends = depends self.cpus = cpus self.gpus = gpus self.mem = mem self.begin = begin self.shell = shell self.tags = util_tags.Tags.coerce(tags) # Extra arguments for sbatch self._sbatch_kvargs = ub.udict(kwargs) & SLURM_SBATCH_KVARGS self._sbatch_flags = ub.udict(kwargs) & SLURM_SBATCH_FLAGS # if shell not in {None, 'bash'}: # raise NotImplementedError(shell) self.jobid = None # only set once this is run (maybe) # --partition=community --cpus-per-task=5 --mem=30602 --gres=gpu:1 def __nice__(self): return repr(self.command)
[docs] def _build_command(self, jobname_to_varname=None): args = self._build_sbatch_args(jobname_to_varname=jobname_to_varname) return ' \\\n '.join(args)
[docs] def _build_sbatch_args(self, jobname_to_varname=None): # job_name = 'todo' # output_fpath = '$HOME/.cache/slurm/logs/job-%j-%x.out' # command = "python -c 'import sys; sys.exit(1)'" # -c 2 -p priority --gres=gpu:1 sbatch_args = ['sbatch'] if self.name: sbatch_args.append(f'--job-name="{self.name}"') if self.cpus: sbatch_args.append(f'--cpus-per-task={self.cpus}') if self.mem: mem = _coerce_mem(self.mem) sbatch_args.append(f'--mem={mem}') if self.gpus and 'gres' not in self._sbatch_kvargs: ub.schedule_deprecation( 'cmd_queue', name='gres', type='argument', migration=ub.paragraph( ''' the handling of gres here is broken and will be changed in the future. For now specify gres explicitly in slurm_options or the kwargs for the queue. '''), deprecate='now' ) # NOTE: the handling of gres here is broken and will be changed in # the future. For now specify gres explicitly in slurm_options def _coerce_gres(gpus): if isinstance(gpus, str): gres = gpus elif isinstance(gpus, int): gres = f'gpu:{gpus}' elif isinstance(gpus, list): gres = 'gpu:0' # hack else: raise TypeError(type(self.gpus)) return gres gres = _coerce_gres(self.gpus) sbatch_args.append(f'--gres="{gres}"') if self.output_fpath: sbatch_args.append(f'--output="{self.output_fpath}"') for key, value in self._sbatch_kvargs.items(): key = key.replace('_', '-') sbatch_args.append(f'--{key}="{value}"') for key, flag in self._sbatch_flags.items(): if flag: key = key.replace('_', '-') sbatch_args.append(f'--{key}"') if self.depends: # TODO: other depends parts type_to_dependencies = { 'afterok': [], } depends = self.depends if ub.iterable(self.depends) else [self.depends] for item in depends: if isinstance(item, SlurmJob): jobid = item.jobid if jobid is None and item.name: if jobname_to_varname and item.name in jobname_to_varname: jobid = '${%s}' % jobname_to_varname[item.name] else: jobid = f"$(squeue --noheader --format %i --name '{item.name}')" type_to_dependencies['afterok'].append(jobid) else: # if isinstance(item, int): # type_to_dependencies['afterok'].append(item) # elif isinstance(item, str): # name = item # item = f"$(squeue --noheader --format %i --name '{name}')" # type_to_dependencies['afterok'].append(item) # else: raise TypeError(type(item)) # squeue --noheader --format %i --name <JOB_NAME> depends_parts = [] for type_, jobids in type_to_dependencies.items(): if jobids: part = ':'.join([str(j) for j in jobids]) depends_parts.append(f'{type_}:{part}') depends_part = ','.join(depends_parts) sbatch_args.append(f'"--dependency={depends_part}"') # Kills jobs too fast # sbatch_args.append('"--kill-on-invalid-dep=yes"') if self.begin: if isinstance(self.begin, int): sbatch_args.append(f'"--begin=now+{self.begin}"') else: sbatch_args.append(f'"--begin={self.begin}"') import shlex wrp_command = shlex.quote(self.command) if self.shell: wrp_command = shlex.quote(self.shell + ' -c ' + wrp_command) sbatch_args.append(f'--wrap {wrp_command}') return sbatch_args
[docs]class SlurmQueue(base_queue.Queue): """ CommandLine: xdoctest -m cmd_queue.slurm_queue SlurmQueue Example: >>> from cmd_queue.slurm_queue import * # NOQA >>> self = SlurmQueue() >>> job0 = self.submit('echo "hi from $SLURM_JOBID"', begin=0) >>> job1 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job0]) >>> job2 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job1]) >>> job3 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job2]) >>> job4 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job3]) >>> job5 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job4]) >>> job6 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job0]) >>> job7 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job5, job6]) >>> self.write() >>> self.print_commands() >>> # xdoctest: +REQUIRES(--run) >>> if not self.is_available(): >>> self.run() Example: >>> from cmd_queue.slurm_queue import * # NOQA >>> self = SlurmQueue(shell='/bin/bash') >>> self.add_header_command('export FOO=bar') >>> job0 = self.submit('echo "$FOO"') >>> job1 = self.submit('echo "$FOO"', depends=job0) >>> job2 = self.submit('echo "$FOO"') >>> job3 = self.submit('echo "$FOO"', depends=job2) >>> self.sync() >>> job4 = self.submit('echo "$FOO"') >>> self.sync() >>> job5 = self.submit('echo "$FOO"') >>> self.print_commands() """ def __init__(self, name=None, shell=None, **kwargs): super().__init__() import uuid import time self.jobs = [] if name is None: name = 'SQ' stamp = time.strftime('%Y%m%dT%H%M%S') self.unused_kwargs = kwargs self.queue_id = name + '-' + stamp + '-' + ub.hash_data(uuid.uuid4())[0:8] self.dpath = ub.Path.appdir('cmd_queue/slurm') / self.queue_id self.log_dpath = self.dpath / 'logs' self.fpath = self.dpath / (self.queue_id + '.sh') self.shell = shell self.header_commands = [] self.all_depends = None self._sbatch_kvargs = ub.udict(kwargs) & SLURM_SBATCH_KVARGS self._sbatch_flags = ub.udict(kwargs) & SLURM_SBATCH_FLAGS def __nice__(self): return self.queue_id
[docs] @classmethod def is_available(cls): """ Determines if we can run the slurm queue or not. """ if ub.find_exe('squeue'): import psutil slurmd_running = any(p.name() == 'slurmd' for p in psutil.process_iter()) if slurmd_running: squeue_working = (ub.cmd('squeue')['ret'] == 0) if squeue_working: # Check if nodes are available or down sinfo = ub.cmd('sinfo --json') if sinfo['ret'] == 0: import json sinfo_out = json.loads(sinfo['out']) has_working_nodes = not all( node['state'] == 'down' for node in sinfo_out['nodes']) if has_working_nodes: return True return False
[docs] def submit(self, command, **kwargs): name = kwargs.get('name', None) if name is None: name = kwargs['name'] = f'J{len(self.jobs):04d}-{self.queue_id}' # + '-job-{}'.format(len(self.jobs)) if 'output_fpath' not in kwargs: kwargs['output_fpath'] = self.log_dpath / (name + '.sh') if self.shell is not None: kwargs['shell'] = kwargs.get('shell', self.shell) if self.all_depends: depends = kwargs.get('depends', None) if depends is None: depends = self.all_depends else: if not ub.iterable(depends): depends = [depends] depends = self.all_depends + depends kwargs['depends'] = depends depends = kwargs.pop('depends', None) if depends is not None: # Resolve any strings to job objects if not ub.iterable(depends): depends = [depends] depends = [ self.named_jobs[dep] if isinstance(dep, str) else dep for dep in depends] _kwargs = self._sbatch_kvargs | kwargs job = SlurmJob(command, depends=depends, **_kwargs) self.jobs.append(job) self.num_real_jobs += 1 self.named_jobs[job.name] = job return job
[docs] def add_header_command(self, command): self.header_commands.append(command)
[docs] def order_jobs(self): import networkx as nx graph = self._dependency_graph() if 0: print(nx.forest_str(nx.minimum_spanning_arborescence(graph))) new_order = [] for node in nx.topological_sort(graph): job = graph.nodes[node]['job'] new_order.append(job) return new_order
[docs] def finalize_text(self, exclude_tags=None, **kwargs): exclude_tags = util_tags.Tags.coerce(exclude_tags) new_order = self.order_jobs() commands = [] homevar = '$HOME' commands.append(f'mkdir -p "{self.log_dpath.shrinkuser(homevar)}"') jobname_to_varname = {} for job in new_order: if exclude_tags and exclude_tags.intersection(job.tags): continue # args = job._build_sbatch_args(jobname_to_varname) # command = ' '.join(args) command = job._build_command(jobname_to_varname) if self.header_commands: command = ' && '.join(self.header_commands + [command]) if 1: varname = 'JOB_{:03d}'.format(len(jobname_to_varname)) command = f'{varname}=$({command} --parsable)' jobname_to_varname[job.name] = varname commands.append(command) self.jobname_to_varname = jobname_to_varname text = '\n'.join(commands) return text
[docs] def run(self, block=True, system=False, **kw): if not self.is_available(): raise Exception('slurm backend is not available') self.log_dpath.ensuredir() self.write() ub.cmd(f'bash {self.fpath}', verbose=3, check=True, system=system) if block: return self.monitor()
[docs] def monitor(self, refresh_rate=0.4): """ Monitor progress until the jobs are done """ import time from rich.live import Live from rich.table import Table import io import pandas as pd jobid_history = set() num_at_start = None def update_status_table(): nonlocal num_at_start # https://rich.readthedocs.io/en/stable/live.html info = ub.cmd('squeue --format="%i %P %j %u %t %M %D %R"') stream = io.StringIO(info['out']) df = pd.read_csv(stream, sep=' ') jobid_history.update(df['JOBID']) num_running = (df['ST'] == 'R').sum() num_in_queue = len(df) total_monitored = len(jobid_history) HACK_KILL_BROKEN_JOBS = 1 if HACK_KILL_BROKEN_JOBS: # For whatever reason using kill-on-invalid-dep # kills jobs too fast and not when they are in a dependency state not a # a never satisfied state. Killing these jobs here seems to fix # it. broken_jobs = df[df['NODELIST(REASON)'] == '(DependencyNeverSatisfied)'] if len(broken_jobs): for name in broken_jobs['NAME']: ub.cmd(f'scancel --name="{name}"') if num_at_start is None: num_at_start = len(df) table = Table(*['num_running', 'num_in_queue', 'total_monitored', 'num_at_start'], title='slurm-monitor') # TODO: determine if slurm has accounting on, and if we can # figure out how many jobs errored / passed table.add_row( f'{num_running}', f'{num_in_queue}', f'{total_monitored}', f'{num_at_start}', ) finished = (num_in_queue == 0) return table, finished try: table, finished = update_status_table() refresh_rate = 0.4 with Live(table, refresh_per_second=4) as live: while not finished: time.sleep(refresh_rate) table, finished = update_status_table() live.update(table) except KeyboardInterrupt: from rich.prompt import Confirm flag = Confirm.ask('do you to kill the procs?') if flag: self.kill()
[docs] def kill(self): cancel_commands = [] for job in self.jobs: cancel_commands.append(f'scancel --name="{job.name}"') for cmd in cancel_commands: ub.cmd(cmd, verbose=2)
[docs] def read_state(self): # Not possible to get full info, but we probably could do better than # this return {}
[docs] def print_commands(self, *args, **kwargs): r""" Print info about the commands, optionally with rich Args: exclude_tags (List[str] | None): if specified exclude jobs submitted with these tags. style (str): can be 'colors', 'rich', or 'plain' CommandLine: xdoctest -m cmd_queue.slurm_queue SlurmQueue.print_commands Example: >>> from cmd_queue.slurm_queue import * # NOQA >>> self = SlurmQueue('test-slurm-queue') >>> self.submit('echo hi 1') >>> self.submit('echo hi 2') >>> self.submit('echo boilerplate', tags='boilerplate') >>> self.print_commands(with_status=True) >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, exclude_tags='boilerplate') """ return super().print_commands(*args, **kwargs)
rprint = print_commands
SLURM_NOTES = r""" This shows a few things you can do with slurm # Queue a job in the background mkdir -p "$HOME/.cache/slurm/logs" sbatch --job-name="test_job1" --output="$HOME/.cache/slurm/logs/job-%j-%x.out" --wrap="python -c 'import sys; sys.exit(1)'" sbatch --job-name="test_job2" --output="$HOME/.cache/slurm/logs/job-%j-%x.out" --wrap="echo 'hello'" #ls $HOME/.cache/slurm/logs cat "$HOME/.cache/slurm/logs/test_echo.log" # Queue a job (and block until completed) srun -c 2 -p priority --gres=gpu:1 echo "hello" srun echo "hello" # List jobs in the queue squeue squeue --format="%i %P %j %u %t %M %D %R" # Show job with specific id (e.g. 6) scontrol show job 6 # Cancel a job with a specific id scancel 6 # Cancel all jobs from a user scancel --user="$USER" # You can setup complicated pipelines # https://hpc.nih.gov/docs/job_dependencies.html # Look at finished jobs # https://ubccr.freshdesk.com/support/solutions/articles/5000686909-how-to-retrieve-job-history-and-accounting # Jobs within since 3:30pm sudo sacct --starttime 15:35:00 sudo sacct sudo sacct --format="JobID,JobName%30,Partition,Account,AllocCPUS,State,ExitCode,elapsed,start" sudo sacct --format="JobID,JobName%30,State,ExitCode,elapsed,start" # SHOW ALL JOBS that ran within MinJobAge scontrol show jobs # State of each partitions sinfo # If the states of the partitions are in drain, find out the reason sinfo -R # For "Low socket*core*thre" FIGURE THIS OUT # Undrain all nodes, first cancel all jobs # https://stackoverflow.com/questions/29535118/how-to-undrain-slurm-nodes-in-drain-state scancel --user="$USER" scancel --state=PENDING scancel --state=RUNNING scancel --state=SUSPENDED sudo scontrol update nodename=namek state=idle # How to submit a batch job with a dependency sbatch --dependency=<type:job_id[:job_id][,type:job_id[:job_id]]> ... Dependency types: after:jobid[:jobid...] job can begin after the specified jobs have started afterany:jobid[:jobid...] job can begin after the specified jobs have terminated afternotok:jobid[:jobid...] job can begin after the specified jobs have failed afterok:jobid[:jobid...] job can begin after the specified jobs have run to completion with an exit code of zero (see the user guide for caveats). singleton jobs can begin execution after all previously launched jobs with the same name and user have ended. This is useful to collate results of a swarm or to send a notification at the end of a swarm. sbatch \ --job-name="tester1" \ --output="test-job-%j-%x.out" \ --cpus-per-task=1 --mem=1000 --gres="gpu:1" \ --gpu-bind "map_gpu:2,3" \ --wrap="python -c \"import torch, os; print(os.getenv('CUDA_VISIBLE_DEVICES', 'x')) and torch.rand(1000).to(0)\"" squeue """