cmd_queue.base_queue module¶
- class cmd_queue.base_queue.Job(command=None, name=None, depends=None, **kwargs)[source]¶
Bases:
NiceRepr
Base class for a job
- class cmd_queue.base_queue.Queue[source]¶
Bases:
NiceRepr
Base class for a queue.
Use the
create
classmethod to make a concrete instance with an available backend.- change_backend(backend, **kwargs)[source]¶
Create a new version of this queue with a different backend.
Currently metadata is not carried over. Submit an MR if you need this functionality.
Example
>>> from cmd_queue import Queue >>> self = Queue.create(size=5, name='demo') >>> self.submit('echo "Hello World"', name='job1a') >>> self.submit('echo "Hello Revocable"', name='job1b') >>> self.submit('echo "Hello Crushed"', depends=['job1a'], name='job2a') >>> self.submit('echo "Hello Shadow"', depends=['job1b'], name='job2b') >>> self.submit('echo "Hello Excavate"', depends=['job2a', 'job2b'], name='job3') >>> self.submit('echo "Hello Barrette"', depends=[], name='jobX') >>> self.submit('echo "Hello Overwrite"', depends=['jobX'], name='jobY') >>> self.submit('echo "Hello Giblet"', depends=['jobY'], name='jobZ') >>> serial_backend = self.change_backend('serial') >>> tmux_backend = self.change_backend('tmux') >>> slurm_backend = self.change_backend('slurm') >>> airflow_backend = self.change_backend('airflow') >>> serial_backend.print_commands() >>> tmux_backend.print_commands() >>> slurm_backend.print_commands() >>> airflow_backend.print_commands()
- sync()[source]¶
Mark that all future jobs will depend on the current sink jobs
- Returns:
a reference to the queue (for chaining)
- Return type:
- write()[source]¶
Writes the underlying files that defines the queue for whatever program will ingest it to run it.
- submit(command, **kwargs)[source]¶
- Parameters:
name – specify the name of the job
**kwargs – passed to
cmd_queue.serial_queue.BashJob
- classmethod create(backend='serial', **kwargs)[source]¶
Main entry point to create a queue
- Parameters:
**kwargs – environ (dict | None): environment variables name (str): queue name dpath (str): queue work directory gpus (int): number of gpus size (int): only for tmux queue, number of parallel queues
- print_commands(with_status=False, with_gaurds=False, with_locks=1, exclude_tags=None, style='colors', **kwargs)[source]¶
- Parameters:
with_status (bool) – tmux / serial only, show bash status boilerplate
with_gaurds (bool) – tmux / serial only, show bash guards boilerplate
with_locks (bool | int) – tmux, show tmux lock boilerplate
exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.
style (str) – can be ‘colors’, ‘rich’, or ‘plain’
**kwargs – extra backend-specific args passed to finalize_text
CommandLine
xdoctest -m cmd_queue.slurm_queue SlurmQueue.print_commands xdoctest -m cmd_queue.serial_queue SerialQueue.print_commands xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.print_commands
- print_graph(reduced=True, vertical_chains=False)[source]¶
Renders the dependency graph to an “network text”
- Parameters:
reduced (bool) – if True only show the implicit dependency forest
- _dependency_graph()[source]¶
Builds a networkx dependency graph for the current jobs
Example
>>> from cmd_queue import Queue >>> self = Queue.create(size=5, name='foo') >>> job1a = self.submit('echo hello && sleep 0.5') >>> job1b = self.submit('echo hello && sleep 0.5') >>> job2a = self.submit('echo hello && sleep 0.5', depends=[job1a]) >>> job2b = self.submit('echo hello && sleep 0.5', depends=[job1b]) >>> job3 = self.submit('echo hello && sleep 0.5', depends=[job2a, job2b]) >>> jobX = self.submit('echo hello && sleep 0.5', depends=[]) >>> jobY = self.submit('echo hello && sleep 0.5', depends=[jobX]) >>> jobZ = self.submit('echo hello && sleep 0.5', depends=[jobY]) >>> graph = self._dependency_graph() >>> self.print_graph()