Source code for cmd_queue.tmux_queue

"""
A very simple queue based on tmux and bash

It should be possible to add more functionality, such as:

    - [x] A linear job queue - via one tmux shell

    - [x] Mulitple linear job queues - via multiple tmux shells

    - [x] Ability to query status of jobs - tmux script writes status to a
          file, secondary thread reads is.

    - [x] Unique identifier per queue

    - [ ] Central scheduler - given that we can know when a job is done
          a central scheduling process can run in the background, check
          the status of existing jobs, and spawn new jobs. --- Maybe not
          needed.

    - [X] Dependencies between jobs - given a central scheduler, it can
          only spawn a new job if a its dependencies have been met.

    - [ ] GPU resource requirements - if a job indicates how much of a
          particular resources it needs, the scheduler can only schedule the
          next job if it "fits" given the resources taken by the current
          running jobs.

    - [x] Duck typed API that uses Slurm if available. Slurm is a robust
          full featured queuing system. If it is available we should
          make it easy for the user to swap the tmux queue for slurm.

    - [x] Duck typed API that uses subprocesses. Tmux is not always available,
          we could go even lighter weight and simply execute a subprocess that
          does the same thing as the linear queue. The downside is you don't
          get the nice tmux way of looking at the status of what the jobs are
          doing, but that doesn't matter in debugged automated workflows, and
          this does seem like a nice simple utility. Doesnt seem to exist
          elsewhere either, but my search terms might be wrong.

    - [ ] Handle the case where some jobs need the GPU and others do not

Example:
    >>> import cmd_queue
    >>> queue = cmd_queue.Queue.create(backend='tmux')
    >>> job1 = queue.submit('echo "Hello World" && sleep 0.1')
    >>> job2 = queue.submit('echo "Hello Kitty" && sleep 0.1', depends=[job1])
    >>> if queue.is_available():
    >>>     queue.run()

"""
import ubelt as ub
# import itertools as it
import uuid

from cmd_queue import base_queue
from cmd_queue import serial_queue
from cmd_queue.util.util_tmux import tmux


[docs]class TMUXMultiQueue(base_queue.Queue): """ Create multiple sets of jobs to start in detatched tmux sessions CommandLine: xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue:0 xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue:2 Example: >>> from cmd_queue.serial_queue import * # NOQA >>> self = TMUXMultiQueue(1, 'test-tmux-queue') >>> job1 = self.submit('echo hi 1 && false') >>> job2 = self.submit('echo hi 2 && true') >>> job3 = self.submit('echo hi 3 && true', depends=job1) >>> self.print_commands() >>> self.print_graph() >>> if self.is_available(): >>> self.run(block=True, onexit='capture', check_other_sessions=0) Example: >>> from cmd_queue.tmux_queue import * # NOQA >>> import random >>> rng = random.Random(54425367001) >>> self = TMUXMultiQueue(1, 'real-world-usecase', gpus=[0, 1]) >>> def add_edge(name, depends): >>> if name is not None: >>> _depends = [self.named_jobs[n] for n in depends if n is not None] >>> self.submit(f'echo name={name}, depends={depends} && sleep 0.1', name=name, depends=_depends) >>> def add_branch(suffix): >>> f = 0.3 >>> pred = f'pred{suffix}' if rng.random() > f else None >>> track = f'track{suffix}' if rng.random() > f else None >>> actclf = f'actclf{suffix}' if rng.random() > f else None >>> pxl_eval = f'pxl_eval{suffix}' if rng.random() > f else None >>> trk_eval = f'trk_eval{suffix}' if rng.random() > f else None >>> act_eval = f'act_eval{suffix}' if rng.random() > f else None >>> add_edge(pred, []) >>> add_edge(track, [pred]) >>> add_edge(actclf, [pred]) >>> add_edge(pxl_eval, [pred]) >>> add_edge(trk_eval, [track]) >>> add_edge(act_eval, [actclf]) >>> for i in range(3): >>> add_branch(str(i)) >>> self.print_commands() >>> self.print_graph() >>> if self.is_available(): >>> self.run(block=1, onexit='', check_other_sessions=0) Example: >>> from cmd_queue.tmux_queue import TMUXMultiQueue >>> self = TMUXMultiQueue(size=2, name='foo') >>> print('self = {!r}'.format(self)) >>> job1 = self.submit('echo hello && sleep 0.5') >>> job2 = self.submit('echo world && sleep 0.5', depends=[job1]) >>> job3 = self.submit('echo foo && sleep 0.5') >>> job4 = self.submit('echo bar && sleep 0.5') >>> job5 = self.submit('echo spam && sleep 0.5', depends=[job1]) >>> job6 = self.submit('echo spam && sleep 0.5') >>> job7 = self.submit('echo err && false') >>> job8 = self.submit('echo spam && sleep 0.5') >>> job9 = self.submit('echo eggs && sleep 0.5', depends=[job8]) >>> job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9]) >>> self.write() >>> self.print_commands() >>> if self.is_available(): >>> self.run(check_other_sessions=0) >>> self.monitor() >>> self.current_output() >>> self.kill() Ignore: >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(2, 'foo', gpus=[0, 1]) >>> job1 = self.submit('echo hello && sleep 0.5') >>> job2 = self.submit('echo hello && sleep 0.5') >>> self.print_commands() >>> # -- >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(2, 'foo') >>> job1 = self.submit('echo hello && sleep 0.5') >>> self.sync() >>> job2 = self.submit('echo hello && sleep 0.5') >>> self.sync() >>> job3 = self.submit('echo hello && sleep 0.5') >>> self.sync() >>> self.print_commands() >>> # -- >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(2, 'foo') >>> job1 = self.submit('echo hello && sleep 0.5') >>> job2 = self.submit('echo hello && sleep 0.5') >>> job3 = self.submit('echo hello && sleep 0.5') >>> self.print_commands() Example: >>> # Test complex failure case >>> from cmd_queue import Queue >>> self = Queue.create(size=2, name='demo-complex-failure', backend='tmux') >>> # Submit a binary tree that fails at different levels >>> for idx in range(2): >>> # Level 0 >>> job1000 = self.submit('true') >>> # Level 1 >>> job1100 = self.submit('true', depends=[job1000]) >>> job1200 = self.submit('false', depends=[job1000], name=f'false0_{idx}') >>> # Level 2 >>> job1110 = self.submit('true', depends=[job1100]) >>> job1120 = self.submit('false', depends=[job1100], name=f'false1_{idx}') >>> job1210 = self.submit('true', depends=[job1200]) >>> job1220 = self.submit('true', depends=[job1200]) >>> # Level 3 >>> job1111 = self.submit('true', depends=[job1110]) >>> job1112 = self.submit('false', depends=[job1110], name=f'false2_{idx}') >>> job1121 = self.submit('true', depends=[job1120]) >>> job1122 = self.submit('true', depends=[job1120]) >>> job1211 = self.submit('true', depends=[job1210]) >>> job1212 = self.submit('true', depends=[job1210]) >>> job1221 = self.submit('true', depends=[job1220]) >>> job1222 = self.submit('true', depends=[job1220]) >>> # Submit a chain that fails in the middle >>> chain1 = self.submit('true', name='chain1') >>> chain2 = self.submit('true', depends=[chain1], name='chain2') >>> chain3 = self.submit('false', depends=[chain2], name='chain3') >>> chain4 = self.submit('true', depends=[chain3], name='chain4') >>> chain5 = self.submit('true', depends=[chain4], name='chain5') >>> # Submit 4 loose passing jobs >>> for _ in range(4): >>> self.submit('true', name=f'loose_true{_}') >>> # Submit 4 loose failing jobs >>> for _ in range(4): >>> self.submit('false', name=f'loose_false{_}') >>> self.print_commands() >>> self.print_graph() >>> if self.is_available(): >>> self.run(with_textual=False, check_other_sessions=0) """ def __init__(self, size=1, name=None, dpath=None, rootid=None, environ=None, gpus=None, gres=None): super().__init__() if rootid is None: rootid = str(ub.timestamp().split('T')[0]) + '_' + ub.hash_data(uuid.uuid4())[0:8] if name is None: name = 'unnamed' self.name = name self.rootid = rootid self.pathid = '{}_{}'.format(self.name, self.rootid) if dpath is None: dpath = ub.Path.appdir('cmd_queue/tmux').ensuredir() self.dpath = (ub.Path(dpath) / self.pathid).ensuredir() if environ is None: environ = {} # Note: size can be changed as long as it happens before the queue is # written and run. if size <= 0: raise ValueError(f'tmux queue size must be positive got size={size}') self.size = size self.environ = environ self.fpath = self.dpath / f'run_queues_{self.name}.sh' if gpus is None and gres is not None: gpus = gres self.gpus = gpus self.cmd_verbose = 2 self.jobs = [] self.header_commands = [] self._tmux_session_prefix = 'cmdq_' self.job_info_dpath = self.dpath / 'job_info' self._new_workers()
[docs] @classmethod def is_available(cls): """ Determines if we can run the tmux queue or not. """ return ub.find_exe('tmux')
[docs] def _new_workers(self, start=0): import itertools as it per_worker_environs = [self.environ] * self.size if self.gpus: # TODO: more sophisticated GPU policy? per_worker_environs = [ ub.dict_union(e, { 'CUDA_VISIBLE_DEVICES': str(cvd), }) for cvd, e in zip(it.cycle(self.gpus), per_worker_environs) ] workers = [ serial_queue.SerialQueue( name='{}{}_{:03d}'.format(self._tmux_session_prefix, self.name, worker_idx), rootid=self.rootid, dpath=self.dpath, environ=e ) for worker_idx, e in enumerate(per_worker_environs, start=start) ] return workers
def __nice__(self): return ub.repr2(self.jobs)
[docs] def _semaphore_wait_command(self, flag_fpaths, msg): r""" TODO: use flock? Ignore: # In queue 1 flock /var/lock/lock1.lock python -c 'while True: print(".", end="")' # In queue 2 flock /var/lock/lock2.lock python -c 'while True: print(".", end="")' # In queue 3 flock /var/lock/lock1.lock echo "first lock finished" && \ flock /var/lock/lock2.lock echo "second lock finished" && \ python -c "print('this command depends on lock1 and lock2 procs completing')" flock /var/lock/lock2.lock echo "second lock finished" flock /var/lock/lock1.lock /var/lock/lock2.lock -c python -c 'while True: print("hi")' """ # TODO: use inotifywait conditions = ['[ ! -f {} ]'.format(p) for p in flag_fpaths] condition = ' || '.join(conditions) # TODO: count number of files that exist command = ub.codeblock( f''' printf "{msg} " while {condition}; do sleep 1; done printf "finished {msg} " ''') return command
[docs] def _semaphore_signal_command(self, flag_fpath): return ub.codeblock( f''' # Signal this worker is complete mkdir -p {flag_fpath.parent} && touch {flag_fpath} ''' )
[docs] def order_jobs(self): """ TODO: ability to shuffle jobs subject to graph constraints Example: >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(5, 'foo') >>> job1a = self.submit('echo hello && sleep 0.5') >>> job1b = self.submit('echo hello && sleep 0.5') >>> job2a = self.submit('echo hello && sleep 0.5', depends=[job1a]) >>> job2b = self.submit('echo hello && sleep 0.5', depends=[job1b]) >>> job3 = self.submit('echo hello && sleep 0.5', depends=[job2a, job2b]) >>> self.print_commands() self.run(block=True, check_other_sessions=0) Example: >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(5, 'foo') >>> job0 = self.submit('true') >>> job1 = self.submit('true') >>> job2 = self.submit('true', depends=[job0]) >>> job3 = self.submit('true', depends=[job1]) >>> #job2c = self.submit('true', depends=[job1a, job1b]) >>> #self.sync() >>> job4 = self.submit('true', depends=[job2, job3, job1]) >>> job5 = self.submit('true', depends=[job4]) >>> job6 = self.submit('true', depends=[job4]) >>> job7 = self.submit('true', depends=[job4]) >>> job8 = self.submit('true', depends=[job5]) >>> job9 = self.submit('true', depends=[job6]) >>> job10 = self.submit('true', depends=[job6]) >>> job11 = self.submit('true', depends=[job7]) >>> job12 = self.submit('true', depends=[job10, job11]) >>> job13 = self.submit('true', depends=[job4]) >>> job14 = self.submit('true', depends=[job13]) >>> job15 = self.submit('true', depends=[job4]) >>> job16 = self.submit('true', depends=[job15, job13]) >>> job17 = self.submit('true', depends=[job4]) >>> job18 = self.submit('true', depends=[job17]) >>> job19 = self.submit('true', depends=[job14, job16, job17]) >>> self.print_graph(reduced=False) ... Graph: ╟── foo-job-0 ╎ └─╼ foo-job-2 ╎ └─╼ foo-job-4 ╾ foo-job-3, foo-job-1 ╎ ├─╼ foo-job-5 ╎ │ └─╼ foo-job-8 ╎ ├─╼ foo-job-6 ╎ │ ├─╼ foo-job-9 ╎ │ └─╼ foo-job-10 ╎ │ └─╼ foo-job-12 ╾ foo-job-11 ╎ ├─╼ foo-job-7 ╎ │ └─╼ foo-job-11 ╎ │ └─╼ ... ╎ ├─╼ foo-job-13 ╎ │ ├─╼ foo-job-14 ╎ │ │ └─╼ foo-job-19 ╾ foo-job-16, foo-job-17 ╎ │ └─╼ foo-job-16 ╾ foo-job-15 ╎ │ └─╼ ... ╎ ├─╼ foo-job-15 ╎ │ └─╼ ... ╎ └─╼ foo-job-17 ╎ ├─╼ foo-job-18 ╎ └─╼ ... ╙── foo-job-1 ├─╼ foo-job-3 │ └─╼ ... └─╼ ... >>> self.print_commands() >>> # self.run(block=True) Example: >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(2, 'test-order-case') >>> self.submit('echo slow1', name='slow1') >>> self.submit('echo fast1', name='fast1') >>> self.submit('echo slow2', name='slow2') >>> self.submit('echo fast2', name='fast2') >>> self.submit('echo slow3', name='slow3') >>> self.submit('echo fast3', name='fast3') >>> self.submit('echo slow4', name='slow4') >>> self.submit('echo fast4', name='fast4') >>> self.print_graph(reduced=False) >>> self.print_commands() """ import networkx as nx from cmd_queue.util.util_network_text import write_network_text graph = self._dependency_graph() # Get rid of implicit dependencies try: reduced_graph = nx.transitive_reduction(graph) except Exception as ex: print('ex = {!r}'.format(ex)) print('graph = {!r}'.format(graph)) print(len(graph.nodes)) print('graph.nodes = {}'.format(ub.repr2(graph.nodes, nl=1))) print('graph.edges = {}'.format(ub.repr2(graph.edges, nl=1))) print(len(graph.edges)) print(graph.is_directed()) print(nx.is_forest(graph)) print(nx.is_directed_acyclic_graph(graph)) simple_cycles = list(nx.cycles.simple_cycles(graph)) print('simple_cycles = {}'.format(ub.repr2(simple_cycles, nl=1))) write_network_text(graph, print, end="") raise in_cut_nodes = set() out_cut_nodes = set() cut_edges = [] for n in reduced_graph.nodes: # TODO: need to also check that the paths to a source node are # not unique, otherwise we dont need to cut the node, but extra # cuts wont matter, just make it less effiicent in_d = reduced_graph.in_degree[n] out_d = reduced_graph.out_degree[n] if in_d > 1: cut_edges.extend(list(reduced_graph.in_edges(n))) in_cut_nodes.add(n) if out_d > 1: cut_edges.extend(list(reduced_graph.out_edges(n))) out_cut_nodes.add(n) cut_graph = reduced_graph.copy() cut_graph.remove_edges_from(cut_edges) # Get all the node groups disconnected by the cuts condensed = nx.condensation(reduced_graph, nx.weakly_connected_components(cut_graph)) # TODO: can we use nx.topological_generations for a more ellegant # solution here? # Rank each condensed group, which defines # what order it is allowed to be executed in rankings = ub.ddict(set) condensed_order = list(nx.topological_sort(condensed)) for c_node in condensed_order: members = set(condensed.nodes[c_node]['members']) ancestors = set(ub.flatten([nx.ancestors(reduced_graph, m) for m in members])) cut_in_ancestors = ancestors & in_cut_nodes cut_out_ancestors = ancestors & out_cut_nodes cut_in_members = members & in_cut_nodes rank = len(cut_in_members) + len(cut_out_ancestors) + len(cut_in_ancestors) for m in members: rankings[rank].update(members) if 0: from graphid.util import util_graphviz import kwplot kwplot.autompl() util_graphviz.show_nx(graph, fnum=1) util_graphviz.show_nx(reduced_graph, fnum=3) util_graphviz.show_nx(condensed, fnum=2) # cmd_queue.write_network_text(condensed, write=print) # Each rank defines a group that must itself be ordered # Ranks will execute sequentially, members within the # rank *might* be run in parallel ranked_job_groups = [] for rank, group in sorted(rankings.items()): subgraph = graph.subgraph(group) # Only things that can run in parallel are disconnected components parallel_groups = [] for wcc in list(nx.weakly_connected_components(subgraph)): sub_subgraph = subgraph.subgraph(wcc) wcc_order = list(nx.topological_sort(sub_subgraph)) parallel_groups.append(wcc_order) # Ranked bins # Solve a bin packing problem to partition these into self.size groups from cmd_queue.util.util_algo import balanced_number_partitioning # Weighting by job heaviness would help here. group_weights = list(map(len, parallel_groups)) groupxs = balanced_number_partitioning(group_weights, num_parts=self.size) rank_groups = [list(ub.take(parallel_groups, gxs)) for gxs in groupxs] rank_groups = [g for g in rank_groups if len(g)] # Reorder each group to better agree with submission order rank_jobs = [] for group in rank_groups: priorities = [] for nodes in group: nodes_index = min(graph.nodes[n]['index'] for n in nodes) priorities.append(nodes_index) final_queue_order = list(ub.flatten(ub.take(group, ub.argsort(priorities)))) final_queue_jobs = [graph.nodes[n]['job'] for n in final_queue_order] rank_jobs.append(final_queue_jobs) ranked_job_groups.append(rank_jobs) if self.size == 1: # If we can only execute one command at a time we dont need to # split up the ranks, which means we dont need semaphores. serial_groups = [] for rank_jobs in ranked_job_groups: serial_groups.extend(list(ub.flatten(rank_jobs))) ranked_job_groups = [[serial_groups]] queue_workers = [] flag_dpath = (self.dpath / 'semaphores') prev_rank_flag_fpaths = None for rank, rank_jobs in enumerate(ranked_job_groups): # Hack, abuse init workers each time to construct workers workers = self._new_workers(start=len(queue_workers)) rank_workers = [] for worker, jobs in zip(workers, rank_jobs): # Add a dummy job to wait for dependencies of this linear queue if prev_rank_flag_fpaths: command = self._semaphore_wait_command(prev_rank_flag_fpaths, msg=f"wait for previous rank {rank - 1}") # Note: this should not be a real job worker.submit(command, bookkeeper=1) for job in jobs: # worker.submit(job.command) worker.submit(job) rank_workers.append(worker) queue_workers.extend(rank_workers) # Add a dummy job at the end of each worker to signal finished rank_flag_fpaths = [] num_rank_workers = len(rank_workers) for worker_idx, worker in enumerate(rank_workers): rank_flag_fpath = flag_dpath / f'rank_flag_{rank}_{worker_idx}_{num_rank_workers}.done' command = self._semaphore_signal_command(rank_flag_fpath) # Note: this should not be a real job worker.submit(command, bookkeeper=1) rank_flag_fpaths.append(rank_flag_fpath) prev_rank_flag_fpaths = rank_flag_fpaths # Overwrite workers with our new dependency aware workers for worker in queue_workers: for header_command in self.header_commands: worker.add_header_command(header_command) self.workers = queue_workers
[docs] def add_header_command(self, command): """ Adds a header command run at the start of each queue """ self.header_commands.append(command)
[docs] def finalize_text(self, **kwargs): self.order_jobs() # Create a driver script driver_lines = [ub.codeblock( f''' #!/bin/bash # Driver script to start the tmux-queue echo "Submitting {self.num_real_jobs} jobs to a tmux queue" ''')] for queue in self.workers: # run_command_in_tmux_queue(command, name) # TODO: figure out how to forward environment variables from the # running sessions. We dont want to log secrets to plaintext. part = ub.codeblock( f''' ### Run Queue: {queue.pathid} with {len(queue)} jobs tmux new-session -d -s {queue.pathid} "bash" tmux send -t {queue.pathid} \\ "source {queue.fpath}" \\ Enter ''').format() driver_lines.append(part) driver_lines += [f'echo "Spread jobs across {len(self.workers)} tmux workers"'] driver_text = '\n\n'.join(driver_lines) return driver_text
[docs] def write(self): self.order_jobs() for queue in self.workers: queue.write() super().write()
[docs] def kill_other_queues(self, ask_first=True): """ Find other tmux sessions that look like they were started with cmd_queue and kill them. """ import parse queue_name_pattern = parse.Parser(self._tmux_session_prefix + '{name}_{rootid}') current_sessions = self._tmux_current_sessions() other_session_ids = [] for info in current_sessions: matched = queue_name_pattern.parse(info['id']) if matched is not None: if self.name == matched['name']: other_session_ids.append(info['id']) # print(f'other_session_ids={other_session_ids}') if other_session_ids: print(f'Detected {len(other_session_ids)} other running cmd-queue sessions with the same name') print('Commands to kill them:') kill_commands = [] for sess_id in other_session_ids: command2 = f'tmux kill-session -t {sess_id}' print(command2) kill_commands.append(command2) from rich import prompt if not ask_first or prompt.Confirm().ask('Do you want to kill the other sessions?'): for command in kill_commands: ub.cmd(command, verbose=self.cmd_verbose)
[docs] def handle_other_sessions(self, other_session_handler): if other_session_handler == 'auto': from cmd_queue.tmux_queue import has_stdin if has_stdin(): other_session_handler = 'ask' else: other_session_handler = 'kill' # default headless behavior if other_session_handler == 'ask': self.kill_other_queues(ask_first=True) elif other_session_handler == 'kill': self.kill_other_queues(ask_first=False) elif other_session_handler == 'ignore': ... else: raise KeyError(other_session_handler)
[docs] def run(self, block=True, onfail='kill', onexit='', system=False, with_textual='auto', check_other_sessions=None, other_session_handler='auto', **kw): """ Execute the queue. Args: other_session_handler (str): How to handle potentially conflicting existing tmux runners with the same queue name. Can be 'kill', 'ask', or 'ignore', or 'auto' - which defaults to 'ask' if stdin is available and 'kill' if it is not. """ if not self.is_available(): raise Exception('tmux not found') # TODO: need to port or generalize some of this logic to serial / slurm # queues. self.handle_other_sessions(other_session_handler) if check_other_sessions: ub.schedule_deprecation( 'tmux_queue', 'check_other_sessions', 'argument') if check_other_sessions == 'auto': if not has_stdin(): check_other_sessions = False if check_other_sessions: self.kill_other_queues(ask_first=True) self.write() ub.cmd(f'bash {self.fpath}', verbose=self.cmd_verbose, check=True, system=system) if block: agg_state = self.monitor(with_textual=with_textual) if onexit == 'capture': self.capture() if not agg_state['failed']: if onfail == 'kill': self.kill() return agg_state
[docs] def read_state(self): agg_state = {} worker_states = [] for worker in self.workers: state = worker.read_state() worker_states.append(state) agg_state['worker_states'] = worker_states try: agg_state['total'] = sum(s['total'] for s in worker_states) agg_state['failed'] = sum(s['failed'] for s in worker_states) agg_state['passed'] = sum(s['passed'] for s in worker_states) agg_state['skipped'] = sum(s['skipped'] for s in worker_states) agg_state['rootid'] = ub.peek(s['rootid'] for s in worker_states) states = set(s['status'] for s in worker_states) agg_state['status'] = 'done' if states == {'done'} else 'not-done' except Exception: pass return agg_state
[docs] def serial_run(self): """ Hack to run everything without tmux. This really should be a different "queue" backend. See Serial Queue instead """ # deprecate: use serial queue instead self.order_jobs() queue_fpaths = [] for queue in self.workers: fpath = queue.write() queue_fpaths.append(fpath) for fpath in queue_fpaths: ub.cmd(f'{fpath}', verbose=self.cmd_verbose, check=True)
[docs] def monitor(self, refresh_rate=0.4, with_textual='auto'): """ Monitor progress until the jobs are done CommandLine: xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:0 xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:1 --interact Example: >>> # xdoctest: +REQUIRES(--interact) >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(size=3, name='test-queue-monitor') >>> job = None >>> for i in range(10): >>> job = self.submit('sleep 2 && echo "hello 2"', depends=job) >>> job = None >>> for i in range(10): >>> job = self.submit('sleep 3 && echo "hello 2"', depends=job) >>> job = None >>> for i in range(5): >>> job = self.submit('sleep 5 && echo "hello 2"', depends=job) >>> self.print_commands() >>> if self.is_available(): >>> self.run(block=True, check_other_sessions=0) Example: >>> # xdoctest: +REQUIRES(env:INTERACTIVE_TEST) >>> from cmd_queue.tmux_queue import * # NOQA >>> # Setup a lot of longer running jobs >>> n = 2 >>> self = TMUXMultiQueue(size=n, name='demo_cmd_queue') >>> first_job = None >>> for i in range(n): ... prev_job = None ... for j in range(4): ... command = f'sleep 1 && echo "This is job {i}.{j}"' ... job = self.submit(command, depends=prev_job) ... prev_job = job ... first_job = first_job or job >>> command = f'sleep 1 && echo "this is the last job"' >>> job = self.submit(command, depends=[prev_job, first_job]) >>> self.print_commands(style='rich') >>> self.print_graph() >>> if self.is_available(): ... self.run(block=True, other_session_handler='kill') """ # print('Start monitor') if with_textual == 'auto': with_textual = CmdQueueMonitorApp is not None # If we dont have stdin (i.e. running in pytest) we cant use # textual. if not has_stdin(): with_textual = False if with_textual: self._textual_monitor() else: self._simple_rich_monitor(refresh_rate) table, finished, agg_state = self._build_status_table() return agg_state
[docs] def _textual_monitor(self): from rich import print as rich_print if 0: print('Kill commands:') for command in self._kill_commands(): print(command) is_running = True while is_running: table_fn = self._build_status_table app = CmdQueueMonitorApp(table_fn, kill_fn=self.kill) app.run() table, finished, agg_state = self._build_status_table() rich_print(table) if app.graceful_exit: is_running = False else: from rich.prompt import Confirm flag = Confirm.ask('do you to kill the procs?') if flag: self.kill() is_running = False
[docs] def _simple_rich_monitor(self, refresh_rate=0.4): import time from rich.live import Live if 0: print('Kill commands:') for command in self._kill_commands(): print(command) try: table, finished, agg_state = self._build_status_table() with Live(table, refresh_per_second=4) as live: while not finished: time.sleep(refresh_rate) table, finished, agg_state = self._build_status_table() live.update(table) except KeyboardInterrupt: from rich.prompt import Confirm flag = Confirm.ask('do you to kill the procs?') if flag: self.kill()
[docs] def _build_status_table(self): from rich.table import Table # https://rich.readthedocs.io/en/stable/live.html table = Table() columns = ['tmux session name', 'status', 'passed', 'failed', 'skipped', 'total'] for col in columns: table.add_column(col) finished = True agg_state = { 'name': 'agg', 'status': '', 'failed': 0, 'passed': 0, 'skipped': 0, 'total': 0 } for worker in self.workers: pass_color = '' fail_color = '' skip_color = '' state = worker.read_state() if state['status'] == 'unknown': finished = False pass_color = '[yellow]' else: finished &= (state['status'] == 'done') if state['status'] == 'done': pass_color = '[green]' if (state['failed'] > 0): fail_color = '[red]' if (state['skipped'] > 0): skip_color = '[yellow]' agg_state['total'] += state['total'] agg_state['passed'] += state['passed'] agg_state['failed'] += state['failed'] agg_state['skipped'] += state['skipped'] table.add_row( state['name'], state['status'], f"{pass_color}{state['passed']}", f"{fail_color}{state['failed']}", f"{skip_color}{state['skipped']}", f"{state['total']}", ) if not finished: agg_state['status'] = 'run' else: agg_state['status'] = 'done' if len(self.workers) > 1: table.add_row( agg_state['name'], agg_state['status'], f"{agg_state['passed']}", f"{agg_state['failed']}", f"{agg_state['skipped']}", f"{agg_state['total']}", ) return table, finished, agg_state
[docs] def print_commands(self, *args, **kwargs): r""" Print info about the commands, optionally with rich Args: with_status (bool): tmux / serial only, show bash status boilerplate with_gaurds (bool): tmux / serial only, show bash guards boilerplate with_locks (bool): tmux, show tmux lock boilerplate exclude_tags (List[str] | None): if specified exclude jobs submitted with these tags. style (str): can be 'colors', 'rich', or 'plain' **kwargs: extra backend-specific args passed to finalize_text CommandLine: xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.print_commands Example: >>> from cmd_queue.tmux_queue import * # NOQA >>> self = TMUXMultiQueue(size=2, name='test-print-commands-tmux-queue') >>> self.submit('echo hi 1', name='job1') >>> self.submit('echo boilerplate job1', depends='job1', tags='boilerplate') >>> self.submit('echo hi 2', log=False) >>> self.submit('echo hi 3') >>> self.submit('echo hi 4') >>> self.submit('echo hi 5', log=False, name='job5') >>> self.submit('echo boilerplate job2', depends='job5', tags='boilerplate') >>> self.submit('echo hi 6', name='job6', depends='job5') >>> self.submit('echo hi 7', name='job7', depends='job5') >>> self.submit('echo boilerplate job3', depends=['job6', 'job7'], tags='boilerplate') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=1, with_gaurds=1, with_locks=1, style='rich') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, with_gaurds=1, with_locks=1, style='rich') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, with_gaurds=0, with_locks=0, style='rich') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, with_gaurds=0, with_locks=0, ... style='auto', exclude_tags='boilerplate') """ self.order_jobs() for queue in self.workers: queue.print_commands(*args, **kwargs) super().print_commands(*args, **kwargs)
[docs] def current_output(self): for queue in self.workers: print('\n\nqueue = {!r}'.format(queue)) # First print out the contents for debug tmux.capture_pane(target_session=queue.pathid, verbose=self.cmd_verbose)
[docs] def _print_commands(self): # First print out the contents for debug for queue in self.workers: command1 = tmux._capture_pane_command(target_session=queue.pathid) yield command1
[docs] def _kill_commands(self): for queue in self.workers: command2 = tmux._kill_session_command(target_session=queue.pathid) yield command2
[docs] def capture(self): for command in self._print_commands(): ub.cmd(command, verbose=self.cmd_verbose)
[docs] def kill(self): # Kills all the tmux panes for command in self._kill_commands(): ub.cmd(command, verbose=self.cmd_verbose)
[docs] def _tmux_current_sessions(self): sessions = tmux.list_sessions() return sessions
[docs]def has_stdin(): import sys try: sys.stdin.fileno() except Exception: return False else: return True
try: import textual # NOQA from cmd_queue.monitor_app import CmdQueueMonitorApp if not hasattr(CmdQueueMonitorApp, 'run'): raise ImportError('Current textual monitor is broken on new versions') except ImportError: CmdQueueMonitorApp = None if 0: __tmux_notes__ = """ # Useful tmux commands tmux list-commands tmux new-session -d -s {queue.pathid} "bash" tmux send -t {queue.pathid} "source {queue.fpath}" Enter tmux new-session -d -s my_session_id "bash" # References: # https://stackoverflow.com/questions/20701757/tmux-setting-environment-variables-for-sessions # Requires tmux 3.2 export MYSECRET=12345 tmux new-session -d -s my_session_id "bash" tmux set -t my_session_id update-environment MYSECRET tmux list-sessions tmux list-panes -a tmux list-windows -a # This can query the content of the current pane tmux capture-pane -p -t "my_session_id:0.0" tmux attach-session -t my_session_id tmux kill-session -t my_session_id tmux list-windows -t my_session_id tmux capture-pane -t my_session_id tmux capture-pane --help -t my_session_id # Example of passing environment variables (but does not use new-session) export MYVAR1=123 export MYVAR2=456 tmux -L MYVAR1 -L MYVAR2 echo $MYVAR1 echo $MYVAR2 # References https://unix.stackexchange.com/questions/743817/how-to-start-tmux-in-a-way-that-it-inherits-all-environment-variables-from-the-c https://stackoverflow.com/questions/20701757/tmux-setting-environment-variables-for-sessions https://github.com/orgs/tmux/discussions/3659 # Can start a new session with a specific environment variable export MYVAR1=123 tmux new-session -d -s my_session_id -e "MYVAR1=$MYVAR1" -- "bash" # Show the environment of the new sesssion tmux show-env -t my_session_id tmux ls tmux kill-session -t my_session_id tmux new-session -d -s my_session_id -e "MYVAR1" -- "bash" """