cmd_queue.slurm_queue module

Work in progress. The idea is to provide a TMUX queue and a SLURM queue that provide a common high level API, even though functionality might diverge, the core functionality of running processes asynchronously should be provided.

Notes

# Installing and configuring SLURM See git@github.com:Erotemic/local.git init/setup_slurm.sh Or ~/local/init/setup_slurm.sh in my local checkout

SUBMIT COMMANDS WILL USE /bin/sh by default, not sure how to fix that properly. There are workarounds though.

CommandLine

xdoctest -m cmd_queue.slurm_queue __doc__

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> dpath = ub.Path.appdir('slurm_queue/tests')
>>> queue = SlurmQueue()
>>> job0 = queue.submit(f'echo "here we go"', name='root job')
>>> job1 = queue.submit(f'mkdir -p {dpath}', depends=[job0])
>>> job2 = queue.submit(f'echo "result=42" > {dpath}/test.txt ', depends=[job1])
>>> job3 = queue.submit(f'cat {dpath}/test.txt', depends=[job2])
>>> queue.print_commands()
>>> # xdoctest: +REQUIRES(--run)
>>> queue.run()
>>> # Can read the output of jobs after they are done.
>>> for job in queue.jobs:
>>>     print('-----------------')
>>>     print(f'job.name={job.name}')
>>>     if job.output_fpath.exists():
>>>         print(job.output_fpath.read_text())
>>>     else:
>>>         print('output does not exist')
cmd_queue.slurm_queue._coerce_mem(mem)[source]
Parameters:

mem (int | str) – integer number of megabytes or a parseable string

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> print(_coerce_mem(30602))
>>> print(_coerce_mem('4GB'))
>>> print(_coerce_mem('32GB'))
>>> print(_coerce_mem('300000000 bytes'))
class cmd_queue.slurm_queue.SlurmJob(command, name=None, output_fpath=None, depends=None, cpus=None, gpus=None, mem=None, begin=None, shell=None, tags=None, **kwargs)[source]

Bases: Job

Represents a slurm job that hasn’t been submitted yet

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> self = SlurmJob('python -c print("hello world")', 'hi', cpus=5, gpus=1, mem='10GB')
>>> command = self._build_sbatch_args()
>>> print('command = {!r}'.format(command))
>>> self = SlurmJob('python -c print("hello world")', 'hi', cpus=5, gpus=1, mem='10GB', depends=[self])
>>> command = self._build_command()
>>> print(command)
_build_command(jobname_to_varname=None)[source]
_build_sbatch_args(jobname_to_varname=None)[source]
class cmd_queue.slurm_queue.SlurmQueue(name=None, shell=None, **kwargs)[source]

Bases: Queue

CommandLine

xdoctest -m cmd_queue.slurm_queue SlurmQueue

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> self = SlurmQueue()
>>> job0 = self.submit('echo "hi from $SLURM_JOBID"', begin=0)
>>> job1 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job0])
>>> job2 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job1])
>>> job3 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job2])
>>> job4 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job3])
>>> job5 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job4])
>>> job6 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job0])
>>> job7 = self.submit('echo "hi from $SLURM_JOBID"', depends=[job5, job6])
>>> self.write()
>>> self.print_commands()
>>> # xdoctest: +REQUIRES(--run)
>>> if not self.is_available():
>>>     self.run()

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> self = SlurmQueue(shell='/bin/bash')
>>> self.add_header_command('export FOO=bar')
>>> job0 = self.submit('echo "$FOO"')
>>> job1 = self.submit('echo "$FOO"', depends=job0)
>>> job2 = self.submit('echo "$FOO"')
>>> job3 = self.submit('echo "$FOO"', depends=job2)
>>> self.sync()
>>> job4 = self.submit('echo "$FOO"')
>>> self.sync()
>>> job5 = self.submit('echo "$FOO"')
>>> self.print_commands()
classmethod is_available()[source]

Determines if we can run the slurm queue or not.

submit(command, **kwargs)[source]
add_header_command(command)[source]
order_jobs()[source]
finalize_text(exclude_tags=None, **kwargs)[source]
run(block=True, system=False, **kw)[source]
monitor(refresh_rate=0.4)[source]

Monitor progress until the jobs are done

kill()[source]
read_state()[source]
print_commands(*args, **kwargs)[source]

Print info about the commands, optionally with rich

Parameters:
  • exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.

  • style (str) – can be ‘colors’, ‘rich’, or ‘plain’

CommandLine

xdoctest -m cmd_queue.slurm_queue SlurmQueue.print_commands

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> self = SlurmQueue('test-slurm-queue')
>>> self.submit('echo hi 1')
>>> self.submit('echo hi 2')
>>> self.submit('echo boilerplate', tags='boilerplate')
>>> self.print_commands(with_status=True)
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, exclude_tags='boilerplate')
rprint(*args, **kwargs)

Print info about the commands, optionally with rich

Parameters:
  • exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.

  • style (str) – can be ‘colors’, ‘rich’, or ‘plain’

CommandLine

xdoctest -m cmd_queue.slurm_queue SlurmQueue.print_commands

Example

>>> from cmd_queue.slurm_queue import *  # NOQA
>>> self = SlurmQueue('test-slurm-queue')
>>> self.submit('echo hi 1')
>>> self.submit('echo hi 2')
>>> self.submit('echo boilerplate', tags='boilerplate')
>>> self.print_commands(with_status=True)
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, exclude_tags='boilerplate')