Cookbook

Example files

This cookbook will make use of three sample python files.

## ./boil_water.py
print("I am boiling water...")
## ./steep_tea.py
print("I am steeping tea...")
## ./pour_tea.py
print("I am pouring tea...")

We also use Git Bash as the terminal. Bash commands work on windows and unix machines unless otherwise stated.

Tasks

Configure a Task

>>> from dequindre import Task
>>> pour_tea = Task('./pour_tea.py')
>>> pour_tea
Task(./pour_tea.py)
>>> pour_tea.loc
'./pour_tea.py'
>>> pour_tea.env
'python'

Note that that the python environment defaulted to ‘python’. To use different environments, we’ll need to define them first.

Commons

The commons submodule removes clutter from your config files. Even small workflows share many parent directories and environments. To this end, the commons submodule makes heavy use of context managers for readability.

Common Tasks

>>> from dequindre.commons import common_task

>>> common_prefix = '/long/path/to/tea-tasks'
>>> with common_task(common_prefix) as T:
...     pour_tea = T(loc='pour_tea.py')
...     drink_tea = T(loc='drink_tea.py')
...
>>> pour_tea
Task(/long/path/to/tea-tasks/pour_tea.py)
>>> drink_tea
Task(/long/path/to/tea-tasks/drink_tea.py)
>>> pour_tea.env
'python'

The resulting task definitions are much easier to read.

Common Environments

Virtual environments get ugly fast, and they’re best kept out of sight for many users.

>>> from dequindre import Task
>>> from dequindre.commons import common_venv

>>> common_prefix = '/my/very/long/path'
>>> with common_venv(common_prefix) as E:
...     tea_env = E('tea-env')
...     biscuit_env = E('biscuit-env')
...
>>> tea_env
'/my/very/long/path/tea-env/bin/python'
>>> drink_tea = Task('./drink_tea.py', tea_env)
>>> drink_tea.env
'/my/very/long/path/tea-env/bin/python'

Notice that common_venv filled in the expected suffix: /bin/python. You can override this behavior with the common_suffix argument.

The same functionality is also supported for pipenv environments and conda environments through the common_pipenv and common_conda_env functions respectively.

DAGs

Configure a DAG

>>> from dequindre import Task, DAG

>>> ## define tasks
>>> boil_water = Task('./boil_water.py')
>>> steep_tea = Task('./steep_tea.py')
>>> pour_tea = Task('./pour_tea.py')

>>> make_tea = DAG()
>>> make_tea.add_dependencies({
...       steep_tea: boil_water,
...       pour_tea: steep_tea
...   })

Dequindre Schedulers

The Dequindre scheduler is the last major object in dequindre. After defining your tasks and task dependencies in the DAG, you can create a Dequindre scheduler.

>>> from dequindre import Task, DAG, Dequindre

>>> ## define tasks
>>> boil_water = Task('./boil_water.py')
>>> steep_tea = Task('./steep_tea.py')
>>> pour_tea = Task('./pour_tea.py')

>>> make_tea = DAG()
>>> make_tea.add_dependencies({
...       steep_tea: boil_water,
...       pour_tea: steep_tea
...   })

>>> dq = Dequindre(make_tea)
>>> dq.get_schedules()
defaultdict(<class 'set'>, {
    1: {Task(boil_water.py)},
    2: {Task(steep_tea.py)},
    3: {Task(pour_tea.py)}})
>>> dq.run_tasks()

Running Task(./boil_water.py)

I am boiling water...

Running Task(./steep_tea.py)

I am steeping tea...

Running Task(./pour_tea.py)

I am pouring tea...

Error Handling

By default, Dequindre uses soft error handling; if one task fails, Dequindre assumes it’s a non-critical error and continues on. But we don’t always want this behavior. Instead, if one task fails, we want the whole schedule to fail.

Dequindre.run_tasks() has an optional error_handling method that takes one of two values: error_handling='soft' or error_handling='hard'. The latter will raise an EarlyAbortError if any of the tasks fail.

Bringing It All Together

After reviewing the cook book so far, we’re ready to write an optimized Dequindre schedule. This works as a good reference when you’re building your first schedules.

from pprint import pprint

from dequindre import Task, DAG, Dequindre
from dequindre.commons import common_task, common_conda_env


def run_schedule():
    print('Starting run-my-schedule...')

    CONDA_PREFIX = '/opt/conda/envs'
    with common_conda_env(CONDA_PREFIX) as conda_env:
        python27 = conda_env('python27')
        python36 = conda_env('python36')

    TASK_PATTERN = '/opt/my-tasks/{}/main.py'
    with common_task(TASK_PATTERN, python27) as T:
        leave_home    = T('leave_home')
        get_fuel      = T('get_fuel')
        get_groceries = T('get_groceries')

    with common_task(TASK_PATTERN, python36) as T:
        pay_rent      = T('pay_rent')
        return_home   = T('return_home')
        make_dinner   = T('make_dinner')
        go_to_bed     = T('go_to_bed')

    dag = DAG(tasks={
        leave_home, get_fuel, get_groceries,
        pay_rent, return_home, make_dinner, go_to_bed
    })
    dag.add_dependencies({
        get_fuel: leave_home,
        get_groceries: leave_home,
        pay_rent: leave_home,
        return_home: {get_fuel, get_groceries, pay_rent},
        make_dinner: {return_home, get_groceries},
        go_to_bed: make_dinner
    })

    dq = Dequindre(dag)
    schedules = dq.get_schedules()
    pprint(schedules)
    # {1: {Task(/opt/my-tasks/leave_home/main.py)},
    #  2: {Task(/opt/my-tasks/get_fuel/main.py),
    #      Task(/opt/my-tasks/pay_rent/main.py),
    #      Task(/opt/my-tasks/get_groceries/main.py)},
    #  3: {Task(/opt/my-tasks/return_home/main.py)},
    #  4: {Task(/opt/my-tasks/make_dinner/main.py)},
    #  5: {Task(/opt/my-tasks/go_to_bed/main.py)}}

    dq.run_tasks()


if __name__ == '__main__':
    run_schedule()