cmd_queue.base_queue module

exception cmd_queue.base_queue.DuplicateJobError[source]

Bases: KeyError

exception cmd_queue.base_queue.UnknownBackendError[source]

Bases: KeyError

class cmd_queue.base_queue.Job(command=None, name=None, depends=None, **kwargs)[source]

Bases: NiceRepr

Base class for a job

class cmd_queue.base_queue.Queue[source]

Bases: NiceRepr

Base class for a queue.

Use the create classmethod to make a concrete instance with an available backend.

change_backend(backend, **kwargs)[source]

Create a new version of this queue with a different backend.

Currently metadata is not carried over. Submit an MR if you need this functionality.

Example

>>> from cmd_queue import Queue
>>> self = Queue.create(size=5, name='demo')
>>> self.submit('echo "Hello World"', name='job1a')
>>> self.submit('echo "Hello Revocable"', name='job1b')
>>> self.submit('echo "Hello Crushed"', depends=['job1a'], name='job2a')
>>> self.submit('echo "Hello Shadow"', depends=['job1b'], name='job2b')
>>> self.submit('echo "Hello Excavate"', depends=['job2a', 'job2b'], name='job3')
>>> self.submit('echo "Hello Barrette"', depends=[], name='jobX')
>>> self.submit('echo "Hello Overwrite"', depends=['jobX'], name='jobY')
>>> self.submit('echo "Hello Giblet"', depends=['jobY'], name='jobZ')
>>> serial_backend = self.change_backend('serial')
>>> tmux_backend = self.change_backend('tmux')
>>> slurm_backend = self.change_backend('slurm')
>>> airflow_backend = self.change_backend('airflow')
>>> serial_backend.print_commands()
>>> tmux_backend.print_commands()
>>> slurm_backend.print_commands()
>>> airflow_backend.print_commands()
sync()[source]

Mark that all future jobs will depend on the current sink jobs

Returns:

a reference to the queue (for chaining)

Return type:

Queue

write()[source]

Writes the underlying files that defines the queue for whatever program will ingest it to run it.

submit(command, **kwargs)[source]
Parameters:
classmethod _backend_classes()[source]
classmethod available_backends()[source]
classmethod create(backend='serial', **kwargs)[source]

Main entry point to create a queue

Parameters:

**kwargs – environ (dict | None): environment variables name (str): queue name dpath (str): queue work directory gpus (int): number of gpus size (int): only for tmux queue, number of parallel queues

write_network_text(reduced=True, rich='auto', vertical_chains=False)[source]
print_commands(with_status=False, with_gaurds=False, with_locks=1, exclude_tags=None, style='colors', **kwargs)[source]
Parameters:
  • with_status (bool) – tmux / serial only, show bash status boilerplate

  • with_gaurds (bool) – tmux / serial only, show bash guards boilerplate

  • with_locks (bool | int) – tmux, show tmux lock boilerplate

  • exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.

  • style (str) – can be ‘colors’, ‘rich’, or ‘plain’

  • **kwargs – extra backend-specific args passed to finalize_text

CommandLine

xdoctest -m cmd_queue.slurm_queue SlurmQueue.print_commands
xdoctest -m cmd_queue.serial_queue SerialQueue.print_commands
xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.print_commands
rprint(**kwargs)[source]
print_graph(reduced=True, vertical_chains=False)[source]

Renders the dependency graph to an “network text”

Parameters:

reduced (bool) – if True only show the implicit dependency forest

_dependency_graph()[source]

Builds a networkx dependency graph for the current jobs

Example

>>> from cmd_queue import Queue
>>> self = Queue.create(size=5, name='foo')
>>> job1a = self.submit('echo hello && sleep 0.5')
>>> job1b = self.submit('echo hello && sleep 0.5')
>>> job2a = self.submit('echo hello && sleep 0.5', depends=[job1a])
>>> job2b = self.submit('echo hello && sleep 0.5', depends=[job1b])
>>> job3 = self.submit('echo hello && sleep 0.5', depends=[job2a, job2b])
>>> jobX = self.submit('echo hello && sleep 0.5', depends=[])
>>> jobY = self.submit('echo hello && sleep 0.5', depends=[jobX])
>>> jobZ = self.submit('echo hello && sleep 0.5', depends=[jobY])
>>> graph = self._dependency_graph()
>>> self.print_graph()
monitor()[source]
_coerce_style(style='auto', with_rich=None, colors=1)[source]