Source code for cmd_queue.serial_queue

"""
References:
    https://jmmv.dev/2018/03/shell-readability-strict-mode.html
    https://stackoverflow.com/questions/13195655/bash-set-x-without-it-being-printed
"""
import ubelt as ub
import uuid
from cmd_queue import base_queue
from cmd_queue.util import util_tags


[docs]def indent(text, prefix=' '): r""" Indents a block of text Args: text (str): text to indent prefix (str, default = ' '): prefix to add to each line Returns: str: indented text >>> from cmd_queue.serial_queue import * # NOQA >>> text = ['aaaa', 'bb', 'cc\n dddd\n ef\n'] >>> text = indent(text) >>> print(text) >>> text = indent(text) >>> print(text) """ if isinstance(text, (list, tuple)): return indent('\n'.join(text), prefix) else: return prefix + text.replace('\n', '\n' + prefix)
[docs]class BashJob(base_queue.Job): r""" A job meant to run inside of a larger bash file. Analog of SlurmJob Attributes: name (str): a name for this job pathid (str): a unique id based on the name and a hashed uuid command (str): the shell command to run depends (List[BashJob] | None): the jobs that this job depends on. This job will only run once all the dependencies have succesfully run. bookkeeper (bool): flag indicating if this is a bookkeeping job or not info_dpath (PathLike | None): where information about this job will be stored log (bool): if True, output of the job will be tee-d and saved to a file, this can have interactions with normal stdout. Defaults to False. tags (List[str] | str | None): a list of strings that can be used to group jobs or filter the queue or other custom purposes. allow_indent (bool): In some cases indentation matters for the shell command. In that case ensure this is False at the cost of readability in the result script. TODO: - [ ] What is a good name for a a list of jobs that must fail for this job to run. Our current depends in analogous to slurm's afterok. What is a good variable name for afternotok? Do we wrap the job with some sort of negation, so we depend on the negation of the job? CommandLine: xdoctest -m cmd_queue.serial_queue BashJob Example: >>> from cmd_queue.serial_queue import * # NOQA >>> # Demo full boilerplate for a job with no dependencies >>> self = BashJob('echo hi', 'myjob') >>> self.print_commands(1, 1) Example: >>> from cmd_queue.serial_queue import * # NOQA >>> # Demo full boilerplate for a job with dependencies >>> dep = BashJob('echo hi', name='job1') >>> conditionals = {'on_skip': ['echo "CUSTOM MESSAGE FOR WHEN WE SKIP A JOB"']} >>> self = BashJob('echo hi', name='job2', depends=[dep]) >>> self.print_commands(1, 1, conditionals=conditionals) """ def __init__(self, command, name=None, depends=None, gpus=None, cpus=None, mem=None, bookkeeper=0, info_dpath=None, log=False, tags=None, allow_indent=True, **kwargs): if depends is not None and not ub.iterable(depends): depends = [depends] self.name = name self.pathid = self.name + '_' + ub.hash_data(uuid.uuid4())[0:8] self.kwargs = kwargs # unused kwargs self.command = command self.depends: list[base_queue.Job] = depends self.bookkeeper = bookkeeper self.log = log if info_dpath is None: info_dpath = ub.Path.appdir('cmd_queue/jobinfos/') / self.pathid self.info_dpath = info_dpath self.pass_fpath = self.info_dpath / f'passed/{self.pathid}.pass' self.fail_fpath = self.info_dpath / f'failed/{self.pathid}.fail' self.stat_fpath = self.info_dpath / f'status/{self.pathid}.stat' self.log_fpath = self.info_dpath / f'status/{self.pathid}.logs' self.tags = util_tags.Tags.coerce(tags) self.allow_indent = allow_indent
[docs] def finalize_text(self, with_status=True, with_gaurds=True, conditionals=None, **kwargs): script = [] prefix_script = [] suffix_script = [] if with_status: # Base conditionals _job_conditionals = { # when the job runs and succeedes 'on_pass': [ f'mkdir -p {self.pass_fpath.parent}', f'printf "pass" > {self.pass_fpath}', ], # when the job fails or does not run 'on_fail': [ f'mkdir -p {self.fail_fpath.parent}', f'printf "fail" > {self.fail_fpath}', ], # when dependencies are unmet 'on_skip': [ ] } # Append custom conditionals if conditionals: for k, v in _job_conditionals.items(): if k in conditionals: v2 = conditionals.get(k) if not ub.iterable(v2): v2 = [v2] v.extend(v2) if with_status: prefix_script.append('# Ensure job status directory') prefix_script.append(f'mkdir -p {self.stat_fpath.parent}') had_conditions = False if with_status: if self.depends: # Dont allow us to run if any dependencies have failed conditions = [] for dep in self.depends: if dep is not None: conditions.append(f'[ -f {dep.pass_fpath} ]') # TODO: if we add the ability to depend on jobs failing then # add those conditions here. if conditions: had_conditions = True condition = ' && '.join(conditions) prefix_script.append(f'if {condition}; then') if with_status: script.append('# before_command:') # import shlex json_fmt_parts = [ ('ret', '%s', 'null'), ('name', '"%s"', self.name), # ('command', '"%s"', shlex.quote(self.command)), ] if self.log: json_fmt_parts += [ ('logs', '"%s"', self.log_fpath), ] dump_pre_status = _bash_json_dump(json_fmt_parts, self.stat_fpath) script.append('# Mark job as running') script.append(dump_pre_status) if with_gaurds and not self.bookkeeper: # -x Tells bash to print the command before it executes it # +e tells bash to allow the command to fail if self.log: # https://stackoverflow.com/questions/6871859/piping-command-output-to-tee-but-also-save-exit-code-of-command script.append('set -o pipefail') script.append('# Disable exit-on-error, enable command echo') script.append('set +e -x') if with_status: # script.append('# </before_command> ') # script.append('# <command> ') script.append('# ********') script.append('# command:') if self.log and with_status: logged_command = f'({self.command}) 2>&1 | tee {self.log_fpath}' script.append(logged_command) else: script.append(self.command) if with_status: script.append('# ********') if with_status: # script.append('# </command> ') # script.append('# <after_command> ') script.append('# after_command:') if with_gaurds: # Tells bash to stop printing commands, but is clever in that it # captures the last return code and doesnt print this command. # Also set -e so our boilerplate is not allowed to fail script.append('# Capture job return code, disable command echo, enable exit-on-error') script.append('{ RETURN_CODE=$? ; set +x -e; } 2>/dev/null') if self.log: script.append('set +o pipefail') else: if with_status: script.append('# Capture job return code') script.append('RETURN_CODE=$?') if had_conditions: suffix_script.append('else') if _job_conditionals['on_skip']: on_skip_part = indent(_job_conditionals['on_skip']) suffix_script.append(on_skip_part) suffix_script.append(' RETURN_CODE=126') suffix_script.append('fi') if self.allow_indent: script = prefix_script + [indent(script)] + suffix_script else: script = prefix_script + script + suffix_script else: script = prefix_script + script + suffix_script if with_status: # import shlex json_fmt_parts = [ ('ret', '%s', '$RETURN_CODE'), ('name', '"%s"', self.name), # ('command', '"%s"', shlex.quote(self.command)), ] if self.log: json_fmt_parts += [ ('logs', '"%s"', self.log_fpath), ] dump_post_status = _bash_json_dump(json_fmt_parts, self.stat_fpath) on_pass_part = indent(_job_conditionals['on_pass']) on_fail_part = indent(_job_conditionals['on_fail']) conditional_body = '\n'.join([ 'if [[ "$RETURN_CODE" == "0" ]]; then', on_pass_part, 'else', on_fail_part, 'fi' ]) script.append('# Mark job as stopped') script.append(dump_post_status) script.append(conditional_body) # script.append('# </after_command> ') assert isinstance(script, list) text = '\n'.join(script) return text
[docs] def print_commands(self, with_status=False, with_gaurds=False, with_rich=None, style='colors', **kwargs): r""" Print info about the commands, optionally with rich Args: with_status (bool): tmux / serial only, show bash status boilerplate with_gaurds (bool): tmux / serial only, show bash guards boilerplate with_locks (bool): tmux, show tmux lock boilerplate exclude_tags (List[str] | None): if specified exclude jobs submitted with these tags. style (str): can be 'colors', 'rich', or 'plain' **kwargs: extra backend-specific args passed to finalize_text CommandLine: xdoctest -m cmd_queue.serial_queue BashJob.print_commands Example: >>> from cmd_queue.serial_queue import * # NOQA >>> self = SerialQueue('test-print-commands-serial-queue') >>> self.submit('echo hi 1') >>> self.submit('echo hi 2') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=1, with_gaurds=1, style='rich') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, with_gaurds=1, style='rich') >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, with_gaurds=0, style='rich') """ style = base_queue.Queue._coerce_style(self, style, with_rich) code = self.finalize_text(with_status=with_status, with_gaurds=with_gaurds, **kwargs) if style == 'rich': from rich.syntax import Syntax from rich.console import Console console = Console() console.print(Syntax(code, 'bash')) elif style == 'colors': print(ub.highlight_code(code, 'bash')) elif style == 'plain': print(code) else: raise KeyError(f'Unknown style={style}')
[docs]class SerialQueue(base_queue.Queue): r""" A linear job queue written to a single bash file Example: >>> from cmd_queue.serial_queue import * # NOQA >>> self = SerialQueue('test-serial-queue', rootid='test-serial') >>> job1 = self.submit('echo "this job fails" && false') >>> job2 = self.submit('echo "this job works" && true') >>> job3 = self.submit('echo "this job wont run" && true', depends=job1) >>> self.print_commands(1, 1) >>> self.run() >>> state = self.read_state() >>> print('state = {}'.format(ub.repr2(state, nl=1))) Example: >>> # Test case where a job fails >>> from cmd_queue.serial_queue import * # NOQA >>> self = SerialQueue('test-serial-queue', rootid='test-serial') >>> job1 = self.submit('echo "job1 fails" && false') >>> job2 = self.submit('echo "job2 never runs"', depends=[job1]) >>> job3 = self.submit('echo "job3 never runs"', depends=[job2]) >>> job4 = self.submit('echo "job4 passes" && true') >>> job5 = self.submit('echo "job5 fails" && false', depends=[job4]) >>> job6 = self.submit('echo "job6 never runs"', depends=[job5]) >>> job7 = self.submit('echo "job7 never runs"', depends=[job4, job2]) >>> job8 = self.submit('echo "job8 never runs"', depends=[job4, job1]) >>> self.print_commands(1, 1) >>> self.run() >>> self.read_state() """ def __init__(self, name='', dpath=None, rootid=None, environ=None, cwd=None, **kwargs): super().__init__() if rootid is None: rootid = str(ub.timestamp().split('T')[0]) + '_' + ub.hash_data(uuid.uuid4())[0:8] self.name = name self.rootid = rootid if dpath is None: dpath = ub.Path.appdir('cmd_queue/serial', self.pathid).ensuredir() self.dpath = ub.Path(dpath) self.unused_kwargs = kwargs self.fpath = self.dpath / (self.pathid + '.sh') self.state_fpath = self.dpath / 'serial_queue_{}.txt'.format(self.pathid) self.environ = environ self.header = '#!/bin/bash' self.header_commands = [] self.jobs = [] self.cwd = cwd self.job_info_dpath = self.dpath / 'job_info' @property def pathid(self): """ A path-safe identifier for file names """ return '{}_{}'.format(self.name, self.rootid) def __nice__(self): return f'{self.pathid} - {self.num_real_jobs}'
[docs] @classmethod def is_available(cls): """ This queue is always available. """ # TODO: get this working return True
[docs] def order_jobs(self): """ Ensure jobs within a serial queue are topologically ordered. Attempts to preserve input ordering. """ # We need to ensure the jobs are in a topologoical order here. import networkx as nx graph = self._dependency_graph() original_order = [j.name for j in self.jobs] from cmd_queue.util import util_networkx if not util_networkx.is_topological_order(graph, original_order): # If not already topologically sorted, try to make the minimal # reordering to achieve it. # FIXME: I think this is not a minimal reordering. topo_generations = list(nx.topological_generations(graph)) new_order = [] original_order = ub.oset(original_order) for gen in topo_generations: new_order.extend(original_order & gen) self.jobs = [self.named_jobs[n] for n in new_order]
[docs] def finalize_text(self, with_status=True, with_gaurds=True, with_locks=True, exclude_tags=None): """ Create the bash script that will: 1. Run all of the jobs in this queue. 2. Track the results. 3. Prevent jobs with unmet dependencies from running. """ import cmd_queue self.order_jobs() script = [self.header] script += ['# Written by cmd_queue {}'.format(cmd_queue.__version__)] total = self.num_real_jobs if with_gaurds: script.append('set -e') if with_status: script.append(ub.codeblock( f''' # Init state to keep track of job progress (( "_CMD_QUEUE_NUM_FAILED=0" )) || true (( "_CMD_QUEUE_NUM_PASSED=0" )) || true (( "_CMD_QUEUE_NUM_SKIPPED=0" )) || true _CMD_QUEUE_TOTAL={total} _CMD_QUEUE_STATUS="" ''')) old_status = None def _mark_status(status): nonlocal old_status # be careful with json formatting here if with_status: if old_status != status: script.append(ub.codeblock( ''' _CMD_QUEUE_STATUS="{}" ''').format(status)) old_status = status # Name, format-string, and value for json status json_fmt_parts = [ ('status', '"%s"', '$_CMD_QUEUE_STATUS'), ('passed', '%d', '$_CMD_QUEUE_NUM_PASSED'), ('failed', '%d', '$_CMD_QUEUE_NUM_FAILED'), ('skipped', '%d', '$_CMD_QUEUE_NUM_SKIPPED'), ('total', '%d', '$_CMD_QUEUE_TOTAL'), ('name', '"%s"', self.name), ('rootid', '"%s"', self.rootid), ] dump_code = _bash_json_dump(json_fmt_parts, self.state_fpath) script.append('# Update queue status') script.append(dump_code) # script.append('cat ' + str(self.state_fpath)) def _command_enter(): if with_gaurds: # Tells bash to print the command before it executes it script.append('set -x') def _command_exit(): if with_gaurds: script.append('{ set +x; } 2>/dev/null') else: if with_status: script.append('RETURN_CODE=$?') _mark_status('init') if self.environ: script.append('#') script.append('# Environment') _mark_status('set_environ') if with_gaurds: _command_enter() script.extend([ f'export {k}="{v}"' for k, v in self.environ.items()]) if with_gaurds: _command_exit() if self.cwd: script.append('#') script.append('# Working Directory') script.append(f'cd {self.cwd}') if self.header_commands: script.append('#') script.append('# Header commands') for command in self.header_commands: _command_enter() script.append(command) _command_exit() if self.jobs: script.append('') script.append('# ----') script.append('# Jobs') script.append('# ----') script.append('') exclude_tags = util_tags.Tags.coerce(exclude_tags) num = 0 for job in self.jobs: if exclude_tags and exclude_tags.intersection(job.tags): continue if job.bookkeeper: if with_locks: script.append(job.finalize_text(with_status, with_gaurds)) else: if with_status: script.append('') script.append('#') script.append('# <job>') _mark_status('run') script.append(ub.codeblock( ''' # ### Command {} / {} - {} ''').format(num + 1, total, job.name)) conditionals = { 'on_pass': '(( "_CMD_QUEUE_NUM_PASSED=_CMD_QUEUE_NUM_PASSED+1" )) || true', 'on_fail': '(( "_CMD_QUEUE_NUM_FAILED=_CMD_QUEUE_NUM_FAILED+1" )) || true', 'on_skip': '(( "_CMD_QUEUE_NUM_SKIPPED=_CMD_QUEUE_NUM_SKIPPED+1" )) || true', } script.append(job.finalize_text(with_status, with_gaurds, conditionals)) if with_status: script.append('# </job>') script.append('#') script.append('') num += 1 _mark_status('done') # Print summary of status at the end. if with_status: script.append('# Display final status of this serial queue') script.append('echo "Command Queue Final Status:"') script.append(f'cat "{self.state_fpath}"') pass if with_gaurds: script.append('set +e') text = '\n'.join(script) return text
[docs] def add_header_command(self, command): self.header_commands.append(command)
[docs] def print_commands(self, *args, **kwargs): r""" Print info about the commands, optionally with rich CommandLine: xdoctest -m cmd_queue.serial_queue SerialQueue.print_commands Example: >>> from cmd_queue.serial_queue import * # NOQA >>> self = SerialQueue('test-serial-queue') >>> self.submit('echo hi 1') >>> self.submit('echo hi 2') >>> self.submit('echo boilerplate', tags='boilerplate') >>> self.print_commands(with_status=True) >>> print('\n\n---\n\n') >>> self.print_commands(with_status=0, exclude_tags='boilerplate') """ return super().print_commands(*args, **kwargs)
# (self, with_status=False, with_gaurds=False, # with_rich=None, colors=1, with_locks=True, # exclude_tags=None, style='auto'): # style = self._coerce_style(style, with_rich, colors) # exclude_tags = util_tags.Tags.coerce(exclude_tags) # code = self.finalize_text(with_status=with_status, # with_gaurds=with_gaurds, # with_locks=with_locks, # exclude_tags=exclude_tags) # if style == 'rich': # from rich.panel import Panel # from rich.syntax import Syntax # from rich.console import Console # console = Console() # console.print(Panel(Syntax(code, 'bash'), title=str(self.fpath))) # # console.print(Syntax(code, 'bash')) # elif style == 'colors': # header = f'# --- {str(self.fpath)}' # print(ub.highlight_code(header, 'bash')) # print(ub.highlight_code(code, 'bash')) # elif style == 'plain': # header = f'# --- {str(self.fpath)}' # print(header) # print(code) # else: # raise KeyError(f'Unknown style={style}') rprint = print_commands
[docs] def run(self, block=True, system=False, shell=1, capture=True, mode='bash', verbose=3, **kw): self.write() # TODO: can implement a monitor here for non-blocking mode detach = not block if mode == 'bash': ub.cmd(f'bash {self.fpath}', verbose=verbose, check=True, capture=capture, shell=shell, system=system, detach=detach) elif mode == 'source': ub.cmd(f'source {self.fpath}', verbose=verbose, check=True, capture=capture, shell=shell, system=system, detach=detach) else: ub.cmd(f'{mode} {self.fpath}', verbose=verbose, check=True, capture=capture, shell=shell, system=system, detach=detach)
# raise KeyError
[docs] def job_details(self): import json for job in self.jobs: print('+--------') print(f'job={job}') job_status = json.loads(job.stat_fpath.read_text()) print('job_status = {}'.format(ub.repr2(job_status, nl=1))) if job.log_fpath.exists(): print(job.log_fpath.read_text()) print('L________')
[docs] def read_state(self): import json import time max_attempts = 100 num_attempts = 0 while True: try: state = json.loads(self.state_fpath.read_text()) except FileNotFoundError: state = { 'name': self.name, 'status': 'unknown', 'total': self.num_real_jobs, 'passed': None, 'failed': None, 'skipped': None, } except json.JSONDecodeError: # we might have tried to read the file while it was being # written try again. num_attempts += 1 if num_attempts > max_attempts: raise time.sleep(0.01) continue break return state
[docs]def _bash_json_dump(json_fmt_parts, fpath): """ Make a printf command that dumps a json file indicating some status in a bash environment. Args: List[Tuple[str, str, str]]: A list of 3-tupels indicating the name of the json key, the printf code, and the bash expression to fill the printf code. fpath (str): where bash should write the json file Returns: str : the bash that will perform the printf Example: >>> from cmd_queue.serial_queue import _bash_json_dump >>> json_fmt_parts = [ >>> ('home', '%s', '$HOME'), >>> ('const', '%s', 'MY_CONSTANT'), >>> ('ps2', '"%s"', '$PS2'), >>> ] >>> fpath = 'out.json' >>> dump_code = _bash_json_dump(json_fmt_parts, fpath) >>> print(dump_code) """ printf_body_parts = [ '"{}": {}'.format(k, f) for k, f, v in json_fmt_parts ] printf_arg_parts = [ '"{}"'.format(v) for k, f, v in json_fmt_parts ] printf_body = r"'{" + ", ".join(printf_body_parts) + r"}\n'" printf_args = ' '.join(printf_arg_parts) redirect_part = '> ' + str(fpath) printf_part = 'printf ' + printf_body + ' \\\n ' + printf_args dump_code = printf_part + ' \\\n ' + redirect_part return dump_code