cmd_queue.tmux_queue module

A very simple queue based on tmux and bash

It should be possible to add more functionality, such as:

  • [x] A linear job queue - via one tmux shell

  • [x] Mulitple linear job queues - via multiple tmux shells

  • [x] Ability to query status of jobs - tmux script writes status to a

    file, secondary thread reads is.

  • [x] Unique identifier per queue

  • [ ] Central scheduler - given that we can know when a job is done

    a central scheduling process can run in the background, check the status of existing jobs, and spawn new jobs. — Maybe not needed.

  • [X] Dependencies between jobs - given a central scheduler, it can

    only spawn a new job if a its dependencies have been met.

  • [ ] GPU resource requirements - if a job indicates how much of a

    particular resources it needs, the scheduler can only schedule the next job if it “fits” given the resources taken by the current running jobs.

  • [x] Duck typed API that uses Slurm if available. Slurm is a robust

    full featured queuing system. If it is available we should make it easy for the user to swap the tmux queue for slurm.

  • [x] Duck typed API that uses subprocesses. Tmux is not always available,

    we could go even lighter weight and simply execute a subprocess that does the same thing as the linear queue. The downside is you don’t get the nice tmux way of looking at the status of what the jobs are doing, but that doesn’t matter in debugged automated workflows, and this does seem like a nice simple utility. Doesnt seem to exist elsewhere either, but my search terms might be wrong.

  • [ ] Handle the case where some jobs need the GPU and others do not

Example

>>> import cmd_queue
>>> queue = cmd_queue.Queue.create(backend='tmux')
>>> job1 = queue.submit('echo "Hello World" && sleep 0.1')
>>> job2 = queue.submit('echo "Hello Kitty" && sleep 0.1', depends=[job1])
>>> if queue.is_available():
>>>     queue.run()
class cmd_queue.tmux_queue.TMUXMultiQueue(size=1, name=None, dpath=None, rootid=None, environ=None, gpus=None, gres=None)[source]

Bases: Queue

Create multiple sets of jobs to start in detatched tmux sessions

CommandLine

xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue:0
xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue:2

Example

>>> from cmd_queue.serial_queue import *  # NOQA
>>> self = TMUXMultiQueue(1, 'test-tmux-queue')
>>> job1 = self.submit('echo hi 1 && false')
>>> job2 = self.submit('echo hi 2 && true')
>>> job3 = self.submit('echo hi 3 && true', depends=job1)
>>> self.print_commands()
>>> self.print_graph()
>>> if self.is_available():
>>>     self.run(block=True, onexit='capture', check_other_sessions=0)

Example

>>> from cmd_queue.tmux_queue import *  # NOQA
>>> import random
>>> rng = random.Random(54425367001)
>>> self = TMUXMultiQueue(1, 'real-world-usecase', gpus=[0, 1])
>>> def add_edge(name, depends):
>>>     if name is not None:
>>>         _depends = [self.named_jobs[n] for n in depends if n is not None]
>>>         self.submit(f'echo name={name}, depends={depends} && sleep 0.1', name=name, depends=_depends)
>>> def add_branch(suffix):
>>>     f = 0.3
>>>     pred = f'pred{suffix}' if rng.random() > f else None
>>>     track = f'track{suffix}' if rng.random() > f else None
>>>     actclf = f'actclf{suffix}' if rng.random() > f else None
>>>     pxl_eval = f'pxl_eval{suffix}' if rng.random() > f else None
>>>     trk_eval = f'trk_eval{suffix}' if rng.random() > f else None
>>>     act_eval = f'act_eval{suffix}' if rng.random() > f else None
>>>     add_edge(pred, [])
>>>     add_edge(track, [pred])
>>>     add_edge(actclf, [pred])
>>>     add_edge(pxl_eval, [pred])
>>>     add_edge(trk_eval, [track])
>>>     add_edge(act_eval, [actclf])
>>> for i in range(3):
>>>     add_branch(str(i))
>>> self.print_commands()
>>> self.print_graph()
>>> if self.is_available():
>>>     self.run(block=1, onexit='', check_other_sessions=0)

Example

>>> from cmd_queue.tmux_queue import TMUXMultiQueue
>>> self = TMUXMultiQueue(size=2, name='foo')
>>> print('self = {!r}'.format(self))
>>> job1 = self.submit('echo hello && sleep 0.5')
>>> job2 = self.submit('echo world && sleep 0.5', depends=[job1])
>>> job3 = self.submit('echo foo && sleep 0.5')
>>> job4 = self.submit('echo bar && sleep 0.5')
>>> job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
>>> job6 = self.submit('echo spam && sleep 0.5')
>>> job7 = self.submit('echo err && false')
>>> job8 = self.submit('echo spam && sleep 0.5')
>>> job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
>>> job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])
>>> self.write()
>>> self.print_commands()
>>> if self.is_available():
>>>     self.run(check_other_sessions=0)
>>>     self.monitor()
>>>     self.current_output()
>>>     self.kill()

Example

>>> # Test complex failure case
>>> from cmd_queue import Queue
>>> self = Queue.create(size=2, name='demo-complex-failure', backend='tmux')
>>> # Submit a binary tree that fails at different levels
>>> for idx in range(2):
>>>     # Level 0
>>>     job1000 = self.submit('true')
>>>     # Level 1
>>>     job1100 = self.submit('true', depends=[job1000])
>>>     job1200 = self.submit('false', depends=[job1000], name=f'false0_{idx}')
>>>     # Level 2
>>>     job1110 = self.submit('true', depends=[job1100])
>>>     job1120 = self.submit('false', depends=[job1100], name=f'false1_{idx}')
>>>     job1210 = self.submit('true', depends=[job1200])
>>>     job1220 = self.submit('true', depends=[job1200])
>>>     # Level 3
>>>     job1111 = self.submit('true', depends=[job1110])
>>>     job1112 = self.submit('false', depends=[job1110], name=f'false2_{idx}')
>>>     job1121 = self.submit('true', depends=[job1120])
>>>     job1122 = self.submit('true', depends=[job1120])
>>>     job1211 = self.submit('true', depends=[job1210])
>>>     job1212 = self.submit('true', depends=[job1210])
>>>     job1221 = self.submit('true', depends=[job1220])
>>>     job1222 = self.submit('true', depends=[job1220])
>>> # Submit a chain that fails in the middle
>>> chain1 = self.submit('true', name='chain1')
>>> chain2 = self.submit('true', depends=[chain1], name='chain2')
>>> chain3 = self.submit('false', depends=[chain2], name='chain3')
>>> chain4 = self.submit('true', depends=[chain3], name='chain4')
>>> chain5 = self.submit('true', depends=[chain4], name='chain5')
>>> # Submit 4 loose passing jobs
>>> for _ in range(4):
>>>     self.submit('true', name=f'loose_true{_}')
>>> # Submit 4 loose failing jobs
>>> for _ in range(4):
>>>     self.submit('false', name=f'loose_false{_}')
>>> self.print_commands()
>>> self.print_graph()
>>> if self.is_available():
>>>     self.run(with_textual=False, check_other_sessions=0)
classmethod is_available()[source]

Determines if we can run the tmux queue or not.

_new_workers(start=0)[source]
_semaphore_wait_command(flag_fpaths, msg)[source]

TODO: use flock?

_semaphore_signal_command(flag_fpath)[source]
order_jobs()[source]

TODO: ability to shuffle jobs subject to graph constraints

Example

>>> from cmd_queue.tmux_queue import *  # NOQA
>>> self = TMUXMultiQueue(5, 'foo')
>>> job1a = self.submit('echo hello && sleep 0.5')
>>> job1b = self.submit('echo hello && sleep 0.5')
>>> job2a = self.submit('echo hello && sleep 0.5', depends=[job1a])
>>> job2b = self.submit('echo hello && sleep 0.5', depends=[job1b])
>>> job3 = self.submit('echo hello && sleep 0.5', depends=[job2a, job2b])
>>> self.print_commands()

self.run(block=True, check_other_sessions=0)

Example

>>> from cmd_queue.tmux_queue import *  # NOQA
>>> self = TMUXMultiQueue(5, 'foo')
>>> job0 = self.submit('true')
>>> job1 = self.submit('true')
>>> job2 = self.submit('true', depends=[job0])
>>> job3 = self.submit('true', depends=[job1])
>>> #job2c = self.submit('true', depends=[job1a, job1b])
>>> #self.sync()
>>> job4 = self.submit('true', depends=[job2, job3, job1])
>>> job5 = self.submit('true', depends=[job4])
>>> job6 = self.submit('true', depends=[job4])
>>> job7 = self.submit('true', depends=[job4])
>>> job8 = self.submit('true', depends=[job5])
>>> job9 = self.submit('true', depends=[job6])
>>> job10 = self.submit('true', depends=[job6])
>>> job11 = self.submit('true', depends=[job7])
>>> job12 = self.submit('true', depends=[job10, job11])
>>> job13 = self.submit('true', depends=[job4])
>>> job14 = self.submit('true', depends=[job13])
>>> job15 = self.submit('true', depends=[job4])
>>> job16 = self.submit('true', depends=[job15, job13])
>>> job17 = self.submit('true', depends=[job4])
>>> job18 = self.submit('true', depends=[job17])
>>> job19 = self.submit('true', depends=[job14, job16, job17])
>>> self.print_graph(reduced=False)
...
Graph:
╟── foo-job-0
╎   └─╼ foo-job-2
╎       └─╼ foo-job-4 ╾ foo-job-3, foo-job-1
╎           ├─╼ foo-job-5
╎           │   └─╼ foo-job-8
╎           ├─╼ foo-job-6
╎           │   ├─╼ foo-job-9
╎           │   └─╼ foo-job-10
╎           │       └─╼ foo-job-12 ╾ foo-job-11
╎           ├─╼ foo-job-7
╎           │   └─╼ foo-job-11
╎           │       └─╼  ...
╎           ├─╼ foo-job-13
╎           │   ├─╼ foo-job-14
╎           │   │   └─╼ foo-job-19 ╾ foo-job-16, foo-job-17
╎           │   └─╼ foo-job-16 ╾ foo-job-15
╎           │       └─╼  ...
╎           ├─╼ foo-job-15
╎           │   └─╼  ...
╎           └─╼ foo-job-17
╎               ├─╼ foo-job-18
╎               └─╼  ...
╙── foo-job-1
    ├─╼ foo-job-3
    │   └─╼  ...
    └─╼  ...
>>> self.print_commands()
>>> # self.run(block=True)

Example

>>> from cmd_queue.tmux_queue import *  # NOQA
>>> self = TMUXMultiQueue(2, 'test-order-case')
>>> self.submit('echo slow1', name='slow1')
>>> self.submit('echo fast1', name='fast1')
>>> self.submit('echo slow2', name='slow2')
>>> self.submit('echo fast2', name='fast2')
>>> self.submit('echo slow3', name='slow3')
>>> self.submit('echo fast3', name='fast3')
>>> self.submit('echo slow4', name='slow4')
>>> self.submit('echo fast4', name='fast4')
>>> self.print_graph(reduced=False)
>>> self.print_commands()
add_header_command(command)[source]

Adds a header command run at the start of each queue

finalize_text(**kwargs)[source]
write()[source]
kill_other_queues(ask_first=True)[source]

Find other tmux sessions that look like they were started with cmd_queue and kill them.

handle_other_sessions(other_session_handler)[source]
run(block=True, onfail='kill', onexit='', system=False, with_textual='auto', check_other_sessions=None, other_session_handler='auto', **kw)[source]

Execute the queue.

Parameters:

other_session_handler (str) – How to handle potentially conflicting existing tmux runners with the same queue name. Can be ‘kill’, ‘ask’, or ‘ignore’, or ‘auto’ - which defaults to ‘ask’ if stdin is available and ‘kill’ if it is not.

read_state()[source]
serial_run()[source]

Hack to run everything without tmux. This really should be a different “queue” backend.

See Serial Queue instead

monitor(refresh_rate=0.4, with_textual='auto')[source]

Monitor progress until the jobs are done

CommandLine

xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:0
xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:1 --interact

Example

>>> # xdoctest: +REQUIRES(--interact)
>>> from cmd_queue.tmux_queue import *  # NOQA
>>> self = TMUXMultiQueue(size=3, name='test-queue-monitor')
>>> job = None
>>> for i in range(10):
>>>     job = self.submit('sleep 2 && echo "hello 2"', depends=job)
>>> job = None
>>> for i in range(10):
>>>     job = self.submit('sleep 3 && echo "hello 2"', depends=job)
>>> job = None
>>> for i in range(5):
>>>     job = self.submit('sleep 5 && echo "hello 2"', depends=job)
>>> self.print_commands()
>>> if self.is_available():
>>>     self.run(block=True, check_other_sessions=0)

Example

>>> # xdoctest: +REQUIRES(env:INTERACTIVE_TEST)
>>> from cmd_queue.tmux_queue import *  # NOQA
>>> # Setup a lot of longer running jobs
>>> n = 2
>>> self = TMUXMultiQueue(size=n, name='demo_cmd_queue')
>>> first_job = None
>>> for i in range(n):
...     prev_job = None
...     for j in range(4):
...         command = f'sleep 1 && echo "This is job {i}.{j}"'
...         job = self.submit(command, depends=prev_job)
...         prev_job = job
...         first_job = first_job or job
>>> command = f'sleep 1 && echo "this is the last job"'
>>> job = self.submit(command, depends=[prev_job, first_job])
>>> self.print_commands(style='rich')
>>> self.print_graph()
>>> if self.is_available():
...     self.run(block=True, other_session_handler='kill')
_textual_monitor()[source]
_simple_rich_monitor(refresh_rate=0.4)[source]
_build_status_table()[source]
print_commands(*args, **kwargs)[source]

Print info about the commands, optionally with rich

Parameters:
  • with_status (bool) – tmux / serial only, show bash status boilerplate

  • with_gaurds (bool) – tmux / serial only, show bash guards boilerplate

  • with_locks (bool) – tmux, show tmux lock boilerplate

  • exclude_tags (List[str] | None) – if specified exclude jobs submitted with these tags.

  • style (str) – can be ‘colors’, ‘rich’, or ‘plain’

  • **kwargs – extra backend-specific args passed to finalize_text

CommandLine

xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.print_commands

Example

>>> from cmd_queue.tmux_queue import *  # NOQA
>>> self = TMUXMultiQueue(size=2, name='test-print-commands-tmux-queue')
>>> self.submit('echo hi 1', name='job1')
>>> self.submit('echo boilerplate job1', depends='job1', tags='boilerplate')
>>> self.submit('echo hi 2', log=False)
>>> self.submit('echo hi 3')
>>> self.submit('echo hi 4')
>>> self.submit('echo hi 5', log=False, name='job5')
>>> self.submit('echo boilerplate job2', depends='job5', tags='boilerplate')
>>> self.submit('echo hi 6', name='job6', depends='job5')
>>> self.submit('echo hi 7', name='job7', depends='job5')
>>> self.submit('echo boilerplate job3', depends=['job6', 'job7'], tags='boilerplate')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=1, with_gaurds=1, with_locks=1, style='rich')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, with_gaurds=1, with_locks=1, style='rich')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, with_gaurds=0, with_locks=0, style='rich')
>>> print('\n\n---\n\n')
>>> self.print_commands(with_status=0, with_gaurds=0, with_locks=0,
...             style='auto', exclude_tags='boilerplate')
current_output()[source]
_print_commands()[source]
_kill_commands()[source]
capture()[source]
kill()[source]
_tmux_current_sessions()[source]
cmd_queue.tmux_queue.has_stdin()[source]