Airflow backend

    pip install apache-airflow
    pip install apache-airflow[cncf.kubernetes]
import ubelt as ub
from cmd_queue import base_queue  # NOQA

[docs]class AirflowJob(base_queue.Job): """ Represents a airflow job that hasn't been executed yet """ def __init__(self, command, name=None, output_fpath=None, depends=None, partition=None, cpus=None, gpus=None, mem=None, begin=None, shell=None, **kwargs): super().__init__() # from airflow.operators.bash import BashOperator if name is None: import uuid name = 'job-' + str(uuid.uuid4()) if depends is not None and not ub.iterable(depends): depends = [depends] self.unused_kwargs = kwargs self.command = command = name self.output_fpath = output_fpath self.depends = depends self.cpus = cpus self.gpus = gpus self.mem = mem self.begin = begin = shell # if shell not in {None, 'bash'}: # raise NotImplementedError(shell) # TODO: # Unfortunately, we need to write out a python file that actually # contains this code. # self.operator = BashOperator( #, # bash_command=command, # dag=dag, # ) # cwd # env # if depends: # for dep in depends: # if dep is not None: # self.operator.set_upstream(dep.operator) # self.jobid = None # only set once this is run (maybe) # --partition=community --cpus-per-task=5 --mem=30602 --gres=gpu:1 def __nice__(self): return repr(self.command)
[docs] def finalize_text(self): dagvar = 'dag' return f'jobs[{!r}] = BashOperator(task_id={!r}, bash_command={self.command!r}, dag={dagvar})'
[docs]class AirflowQueue(base_queue.Queue): """ Example: >>> # xdoctest: +REQUIRES(module:airflow) >>> # xdoctest: +SKIP >>> from cmd_queue.airflow_queue import * # NOQA >>> self = AirflowQueue('zzz_mydemo') >>> job1 = self.submit('echo hi 1 && true') >>> job2 = self.submit('echo hi 2 && true') >>> job3 = self.submit('echo hi 3 && true', depends=job1) >>> self.print_commands() >>> self.write() >>> >>> >>> # self.read_state() Ignore: airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin AIRFLOW__CORE__DAGS_FOLDER="." airflow standalone AIRFLOW__CORE__DAGS_FOLDER="." airflow scheduler AIRFLOW__CORE__DAGS_FOLDER="." airflow webserver AIRFLOW__CORE__DAGS_FOLDER="." airflow dags test zzz_cmd_queue_demo_dag $(date +"%Y-%m-%d") AIRFLOW__CORE__DAGS_FOLDER="." airflow dags list AIRFLOW__CORE__DAGS_FOLDER="." airflow tasks list zzz_cmd_queue_demo_dag AIRFLOW__CORE__DAGS_FOLDER="." airflow tasks clear zzz_cmd_queue_demo_dag AIRFLOW__CORE__DAGS_FOLDER="." airflow dags backfill zzz_cmd_queue_demo_dag --start-date $(date +"%Y-%m-%d") cd /home/joncrall/.cache/cmd_queue/SQ-20220711T180827-12f2905e """ def __init__(self, name=None, shell=None, **kwargs): super().__init__() import uuid import time = [] if name is None: name = 'SQ' = name stamp = time.strftime('%Y%m%dT%H%M%S') self.unused_kwargs = kwargs self.queue_id = name + '-' + stamp + '-' + ub.hash_data(uuid.uuid4())[0:8] self.dpath = ub.Path.appdir('cmd_queue') / self.queue_id self.log_dpath = self.dpath / 'logs' self.fpath = self.dpath / (self.queue_id + '.py') = shell self.header_commands = [] self.all_depends = None # from airflow import DAG # from datetime import timedelta # self.dag = DAG( # # dag_id=self.queue_id, # dag_id=ub.hash_data(self.queue_id)[0:8], # # These args will get passed on to each operator # # You can override them on a per-task basis during operator initialization # default_args={ # # 'depends_on_past': False, # # 'retries': 1, # 'retries': 0, # # 'retry_delay': timedelta(minutes=5), # }, # # description='A simple tutorial DAG', # max_active_runs=1, # schedule_interval=timedelta(days=1), # start_date=ub.timeparse(ub.timestamp()), # catchup=False, # # tags=['example'], # tags=[self.queue_id], # )
[docs] @classmethod def is_available(cls): """ Determines if we can run the tmux queue or not. """ # TODO: get this working return False
[docs] def run(self, block=True, system=False): self.write() # TODO: can implement a monitor here for non-blocking mode import os env = os.environ.copy() env['AIRFLOW__CORE__DAGS_FOLDER'] = os.fspath(self.dpath) detach = not block ub.cmd(f'airflow dags backfill {} --start-date $(date +"%Y-%m-%d")', verbose=3, check=True, shell=1, system=system, detach=detach, env=env)
[docs] def finalize_text(self): header = ub.codeblock( f''' from airflow import DAG from datetime import timezone from datetime import datetime as datetime_cls from airflow.operators.bash import BashOperator now = datetime_cls.utcnow().replace(tzinfo=timezone.utc) dag = DAG( '{}', start_date=now, catchup=False, tags=['example'], ) jobs = dict() ''' ) parts = [header] for job in parts.append(job.finalize_text()) for job in for dep in job.depends or []: if dep is not None: parts.append(f'jobs[{!r}].set_upstream(jobs[{!r}])') # if depends: # for dep in depends: # if dep is not None: # self.operator.set_upstream(dep.operator) # parts.append(', local=True)') # t1 = BashOperator(task_id='task1', bash_command='date', dag=dag) # t2 = BashOperator(task_id='task2', bash_command='echo hi 1 && true', dag=dag) # t2.set_upstream(t1) text = '\n'.join(parts) return text
[docs] def submit(self, command, **kwargs): name = kwargs.get('name', None) if name is None: name = kwargs['name'] = f'J{len(}-{self.queue_id}' # + '-job-{}'.format(len( if 'output_fpath' not in kwargs: kwargs['output_fpath'] = self.log_dpath / (name + '.sh') if is not None: kwargs['shell'] = kwargs.get('shell', if self.all_depends: depends = kwargs.get('depends', None) if depends is None: depends = self.all_depends else: if not ub.iterable(depends): depends = [depends] depends = self.all_depends + depends kwargs['depends'] = depends depends = kwargs.pop('depends', None) if depends is not None: # Resolve any strings to job objects depends = [ self.named_jobs[dep] if isinstance(dep, str) else dep for dep in depends] # dag = self.dag job = AirflowJob(command, depends=depends, **kwargs) self.num_real_jobs += 1 self.named_jobs[] = job return job
[docs] def print_commands(self, with_status=False, with_gaurds=False, with_locks=1, exclude_tags=None, style='auto', with_rich=None, colors=1, **kwargs): r""" Print info about the commands, optionally with rich Example: >>> # xdoctest: +SKIP >>> from cmd_queue.airflow_queue import * # NOQA >>> self = AirflowQueue() >>> self.submit('date') >>> self.print_commands() >>> """ style = self._coerce_style(style, with_rich, colors) code = self.finalize_text() if style == 'rich': from rich.panel import Panel from rich.syntax import Syntax from rich.console import Console console = Console() console.print(Panel(Syntax(code, 'python'), title=str(self.fpath))) # console.print(Syntax(code, 'bash')) elif style == 'colors': header = f'# --- {str(self.fpath)}' print(ub.highlight_code(header, 'python')) print(ub.highlight_code(code, 'python')) elif style == 'plain': header = f'# --- {str(self.fpath)}' print(header) print(code) else: raise KeyError(f'Unknown style={style}')
rprint = print_commands # backwards compat
[docs]def demo(): """ Airflow requires initialization: airflow db init Ignore: from cmd_queue.airflow_queue import * # NOQA demo() """ from airflow import DAG from datetime import timezone from datetime import datetime as datetime_cls from airflow.operators.bash import BashOperator now = datetime_cls.utcnow().replace(tzinfo=timezone.utc) dag = DAG( 'mycustomdag', start_date=now, catchup=False, tags=['example'], ) t1 = BashOperator(task_id='task1', bash_command='date', dag=dag) t2 = BashOperator(task_id='task2', bash_command='echo hi 1 && true', dag=dag) t2.set_upstream(t1), local=True)
if __name__ == '__main__': """ CommandLine: python ~/code/cmd_queue/cmd_queue/ """ demo()