Source code for cmd_queue.cli_boilerplate

"""
This file defines a helper scriptconfig base config that can be used to help
make cmd_queue CLIs so cmd_queue options are standardized and present at the
top level.

CommandLine:
    xdoctest -m cmd_queue.cli_boilerplate __doc__:0

Example:
    >>> from cmd_queue.cli_boilerplate import CMDQueueConfig
    >>> import scriptconfig as scfg
    >>> import rich
    >>> #
    >>> class MyQueueCLI(CMDQueueConfig):
    >>>     'A custom CLI that includes the cmd-queue boilerplate'
    >>>     my_input_file = scfg.Value(None, help='some custom param')
    >>>     my_num_steps = scfg.Value(3, help='some custom param')
    >>>     is_small = scfg.Value(False, help='some custom param')
    >>>     my_output_file = scfg.Value(None, help='some custom param')
    >>> #
    >>> def my_cli_main(cmdline=1, **kwargs):
    >>>     config = MyQueueCLI.cli(cmdline=cmdline, data=kwargs)
    >>>     rich.print('config = {}'.format(ub.urepr(config, nl=1)))
    >>>     queue = config.create_queue()
    >>>     #
    >>>     ###
    >>>     # Custom code to submit jobs to the queue
    >>>     #
    >>>     job0 = queue.submit(f'echo "processing input file: {config.my_input_file}"', name='ROOT-INPUT-JOB')
    >>>     #
    >>>     independent_outputs = []
    >>>     for idx in range(config.my_num_steps):
    >>>         job_t1 = queue.submit(f'echo "tree {idx}.S"', depends=[job0], name=f'jobname{idx}.1')
    >>>         if not config.is_small:
    >>>             job_t2 = queue.submit(f'echo "tree {idx}.SL"', depends=[job_t1], name=f'jobname{idx}.2')
    >>>             job_t3 = queue.submit(f'echo "tree {idx}.SR"', depends=[job_t2], name=f'jobname{idx}.3')
    >>>             job_t4 = queue.submit(f'echo "tree {idx}.SRR"', depends=[job_t3], name=f'jobname{idx}.4')
    >>>             job_t5 = queue.submit(f'echo "tree {idx}.SRL"', depends=[job_t3], name=f'jobname{idx}.5')
    >>>             job_t6 = queue.submit(f'echo "tree {idx}.T"', depends=[job_t4, job_t5], name=f'jobname{idx}.6')
    >>>             job_t7 = queue.submit(f'echo "tree {idx}.SLT"', depends=[job_t2], name=f'jobname{idx}.7')
    >>>             independent_outputs.extend([job_t6, job_t2])
    >>>         else:
    >>>             independent_outputs.extend([job_t1])
    >>>     #
    >>>     queue.submit(f'echo "processing output file: {config.my_output_file}"', depends=independent_outputs, name='FINAL-OUTPUT-JOB')
    >>>     ###
    >>>     #
    >>>     config.run_queue(queue)
    >>> #
    >>> # Show what happens when you use the serial backend
    >>> print('-------------------')
    >>> print('--- DEMO SERIAL ---')
    >>> print('-------------------')
    >>> my_cli_main(
    >>>     cmdline=0,
    >>>     run=0,
    >>>     print_queue=1,
    >>>     print_commands=1,
    >>>     backend='serial'
    >>> )
    >>> # Show what happens when you use the tmux backend
    >>> print('-----------------')
    >>> print('--- DEMO TMUX ---')
    >>> print('-----------------')
    >>> my_cli_main(
    >>>     cmdline=0,
    >>>     run=0,
    >>>     print_queue=0,
    >>>     is_small=True,
    >>>     my_num_steps=0,
    >>>     print_commands=1,
    >>>     backend='tmux'
    >>> )
    >>> # Show what happens when you use the slurm backend
    >>> print('------------------')
    >>> print('--- DEMO SLURM ---')
    >>> print('------------------')
    >>> my_cli_main(
    >>>     cmdline=0,
    >>>     run=0, backend='slurm',
    >>>     print_commands=1,
    >>>     print_queue=False,
    >>>     slurm_options='''
    >>>         partition: 'general-gpu'
    >>>         account: 'default'
    >>>         ntasks: 1
    >>>         gres: 'gpu:1'
    >>>         cpus_per_task: 4
    >>>     '''
    >>> )
    >>> # xdoctest: +REQUIRES(--run)
    >>> # Actually run with the defaults
    >>> print('----------------')
    >>> print('--- DEMO RUN ---')
    >>> print('----------------')
    >>> my_cli_main(cmdline=0, run=1, print_queue=0, print_commands=0)
"""
import scriptconfig as scfg
import ubelt as ub


__docstubs__ = """
import cmd_queue
"""


[docs]class CMDQueueConfig(scfg.DataConfig): """ A helper to carry around the common boilerplate for cmd-queue CLI's. The general usage is that you will inherit from this class and define config options your CLI cares about, however they must not overload any of the options specified here. Usage will be to call :func:`CMDQueueConfig.create_queue` to initialize a queue based on these options, and then execute it with :func:`CMDQueueConfig.run_queue`. In this way you do not need to worry about this specific boilerplate when writing your application. See ``cmd_queue.cli_boilerplate __doc__:0`` for example usage. """ run = scfg.Value(False, isflag=True, help='if False, only prints the commands, otherwise executes them', group='cmd-queue') backend = scfg.Value('tmux', help=('The cmd_queue backend. Can be tmux, slurm, or serial'), group='cmd-queue') queue_name = scfg.Value(None, help='overwrite the default queue name', group='cmd-queue') print_commands = scfg.Value('auto', isflag=True, help='enable / disable rprint before exec', group='cmd-queue') print_queue = scfg.Value('auto', isflag=True, help='print the cmd queue DAG', group='cmd-queue') with_textual = scfg.Value('auto', isflag=True, help='setting for cmd-queue monitoring', group='cmd-queue') other_session_handler = scfg.Value('ask', help='for tmux backend only. How to handle conflicting sessions. Can be ask, kill, or ignore, or auto', group='cmd-queue') virtualenv_cmd = scfg.Value(None, type=str, help=ub.paragraph( ''' Command to start the appropriate virtual environment if your bashrc does not start it by default.'''), group='cmd-queue') tmux_workers = scfg.Value(8, help='number of tmux workers in the queue for the tmux backend', group='cmd-queue') slurm_options = scfg.Value(None, help='if the backend is slurm, provide a YAML dictionary for things like partition / etc...', group='cmd-queue') def __post_init__(self): from cmd_queue.util.util_yaml import Yaml self.slurm_options = Yaml.coerce(self.slurm_options) or {}
[docs] def create_queue(config, **kwargs): """ Create an empty queue based on options specified in this config Args: **kwargs: extra args passed to cmd_queue.Queue.create Returns: cmd_queue.Queue """ import cmd_queue queuekw = {} if config.backend == 'slurm': queuekw.update(config.slurm_options) elif config.backend == 'tmux': queuekw.update({ 'size': config.tmux_workers, }) queuekw.update(kwargs) if 'name' not in queuekw: queuekw['name'] = config.queue_name queue = cmd_queue.Queue.create( backend=config.backend, **queuekw) if config.virtualenv_cmd: queue.add_header_command(config.virtualenv_cmd) return queue
[docs] def run_queue(config, queue, print_kwargs=None, **kwargs): """ Execute a queue with options based on this config. Args: queue (cmd_queue.Queue): queue to run / report print_kwargs (None | Dict): """ import cmd_queue queue: cmd_queue.Queue print_thresh = 30 if config['print_commands'] == 'auto': if len(queue) < print_thresh: config['print_commands'] = 1 else: print(f'More than {print_thresh} jobs, skip queue.print_commands. ' 'If you want to see them explicitly specify print_commands=1') config['print_commands'] = 0 if config['print_queue'] == 'auto': if len(queue) < print_thresh: config['print_queue'] = 1 else: print(f'More than {print_thresh} jobs, skip queue.print_graph. ' 'If you want to see them explicitly specify print_queue=1') config['print_queue'] = 0 if config.print_commands: if print_kwargs is None: print_kwargs = {} queue.print_commands(**print_kwargs) if config.print_queue: queue.print_graph(vertical_chains=True) if config.run: queue.run(with_textual=config.with_textual, other_session_handler=config.other_session_handler, **kwargs)