cmd_queue.serial_queue module

References

https://jmmv.dev/2018/03/shell-readability-strict-mode.html https://stackoverflow.com/questions/13195655/bash-set-x-without-it-being-printed

cmd_queue.serial_queue.indent(text, prefix='    ')[source]

Indents a block of text

Parameters:
  • text (str) – text to indent

  • prefix (str, default = ‘ ‘) – prefix to add to each line

Returns:

indented text

>>> from cmd_queue.serial_queue import *  # NOQA
>>> text = ['aaaa', 'bb', 'cc\n   dddd\n    ef\n']
>>> text = indent(text)
>>> print(text)
>>> text = indent(text)
>>> print(text)

Return type:

str

class cmd_queue.serial_queue.BashJob(command, name=None, depends=None, gpus=None, cpus=None, mem=None, bookkeeper=0, info_dpath=None, log=False, tags=None, allow_indent=True, **kwargs)[source]

Bases: Job

A job meant to run inside of a larger bash file. Analog of SlurmJob

Variables:
  • name (str) – a name for this job

  • pathid (str) – a unique id based on the name and a hashed uuid

  • command (str) – the shell command to run

  • depends (List[BashJob] | None) – the jobs that this job depends on. This job will only run once all the dependencies have succesfully run.

  • bookkeeper (bool) – flag indicating if this is a bookkeeping job or not

  • info_dpath (PathLike | None) – where information about this job will be stored

  • log (bool) – if True, output of the job will be tee-d and saved to a file, this can have interactions with normal stdout. Defaults to False.

  • tags (List[str] | str | None) – a list of strings that can be used to group jobs or filter the queue or other custom purposes.

  • allow_indent (bool) – In some cases indentation matters for the shell command. In that case ensure this is False at the cost of readability in the result script.

Todo

  • [ ] What is a good name for a a list of jobs that must fail

    for this job to run. Our current depends in analogous to slurm’s afterok. What is a good variable name for afternotok? Do we wrap the job with some sort of negation, so we depend on the negation of the job?

CommandLine

xdoctest -m cmd_queue.serial_queue BashJob

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> # Demo full boilerplate for a job with no dependencies
>>> self = BashJob('echo hi', 'myjob')
>>> self.print_commands(1, 1)

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> # Demo full boilerplate for a job with dependencies
>>> dep = BashJob('echo hi', name='job1')
>>> conditionals = {'on_skip': ['echo "CUSTOM MESSAGE FOR WHEN WE SKIP A JOB"']}
>>> self = BashJob('echo hi', name='job2', depends=[dep])
>>> self.print_commands(1, 1, conditionals=conditionals)
finalize_text(with_status=True, with_gaurds=True, conditionals=None, **kwargs)[source]
print_commands(with_status=False, with_gaurds=False, with_rich=None, style='colors', **kwargs)[source]

Print info about the commands, optionally with rich

Parameters:
  • with_status (bool) – tmux / serial only, show bash status boilerplate

  • with_gaurds (bool) – tmux / serial only, show bash guards boilerplate

  • with_locks (bool) – tmux, show tmux lock boilerplate

  • exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.

  • style (str) – can be ‘colors’, ‘rich’, or ‘plain’

  • **kwargs – extra backend-specific args passed to finalize_text

CommandLine

xdoctest -m cmd_queue.serial_queue BashJob.print_commands

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = SerialQueue('test-print-commands-serial-queue')
>>> self.submit('echo hi 1')
>>> self.submit('echo hi 2')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=1, with_gaurds=1, style='rich')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, with_gaurds=1, style='rich')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, with_gaurds=0, style='rich')
class cmd_queue.serial_queue.SerialQueue(name='', dpath=None, rootid=None, environ=None, cwd=None, **kwargs)[source]

Bases: Queue

A linear job queue written to a single bash file

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = SerialQueue('test-serial-queue', rootid='test-serial')
>>> job1 = self.submit('echo "this job fails" && false')
>>> job2 = self.submit('echo "this job works" && true')
>>> job3 = self.submit('echo "this job wont run" && true', depends=job1)
>>> self.print_commands(1, 1)
>>> self.run()
>>> state = self.read_state()
>>> print('state = {}'.format(ub.repr2(state, nl=1)))

Example

>>> # Test case where a job fails
>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = SerialQueue('test-serial-queue', rootid='test-serial')
>>> job1 = self.submit('echo "job1 fails" && false')
>>> job2 = self.submit('echo "job2 never runs"', depends=[job1])
>>> job3 = self.submit('echo "job3 never runs"', depends=[job2])
>>> job4 = self.submit('echo "job4 passes" && true')
>>> job5 = self.submit('echo "job5 fails" && false', depends=[job4])
>>> job6 = self.submit('echo "job6 never runs"', depends=[job5])
>>> job7 = self.submit('echo "job7 never runs"', depends=[job4, job2])
>>> job8 = self.submit('echo "job8 never runs"', depends=[job4, job1])
>>> self.print_commands(1, 1)
>>> self.run()
>>> self.read_state()
property pathid

A path-safe identifier for file names

classmethod is_available()[source]

This queue is always available.

order_jobs()[source]

Ensure jobs within a serial queue are topologically ordered. Attempts to preserve input ordering.

finalize_text(with_status=True, with_gaurds=True, with_locks=True, exclude_tags=None)[source]

Create the bash script that will:

  1. Run all of the jobs in this queue.

  2. Track the results.

  3. Prevent jobs with unmet dependencies from running.

add_header_command(command)[source]
print_commands(*args, **kwargs)[source]

Print info about the commands, optionally with rich

CommandLine

xdoctest -m cmd_queue.serial_queue SerialQueue.print_commands

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = SerialQueue('test-serial-queue')
>>> self.submit('echo hi 1')
>>> self.submit('echo hi 2')
>>> self.submit('echo boilerplate', tags='boilerplate')
>>> self.print_commands(with_status=True)
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, exclude_tags='boilerplate')
rprint(*args, **kwargs)

Print info about the commands, optionally with rich

CommandLine

xdoctest -m cmd_queue.serial_queue SerialQueue.print_commands

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = SerialQueue('test-serial-queue')
>>> self.submit('echo hi 1')
>>> self.submit('echo hi 2')
>>> self.submit('echo boilerplate', tags='boilerplate')
>>> self.print_commands(with_status=True)
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, exclude_tags='boilerplate')
run(block=True, system=False, shell=1, capture=True, mode='bash', verbose=3, **kw)[source]
job_details()[source]
read_state()[source]
cmd_queue.serial_queue._bash_json_dump(json_fmt_parts, fpath)[source]

Make a printf command that dumps a json file indicating some status in a bash environment.

Parameters:
  • List[Tuple[str, str, str]] – A list of 3-tupels indicating the name of the json key, the printf code, and the bash expression to fill the printf code.

  • fpath (str) – where bash should write the json file

Returns:

the bash that will perform the printf

Return type:

str

Example

>>> from cmd_queue.serial_queue import _bash_json_dump
>>> json_fmt_parts = [
>>>     ('home', '%s', '$HOME'),
>>>     ('const', '%s', 'MY_CONSTANT'),
>>>     ('ps2', '"%s"', '$PS2'),
>>> ]
>>> fpath = 'out.json'
>>> dump_code = _bash_json_dump(json_fmt_parts, fpath)
>>> print(dump_code)