Cookbook¶
Example files¶
This cookbook will make use of three sample python files.
## ./boil_water.py
print("I am boiling water...")
## ./steep_tea.py
print("I am steeping tea...")
## ./pour_tea.py
print("I am pouring tea...")
We also use Git Bash as the terminal. Bash commands work on windows and unix machines unless otherwise stated.
Tasks¶
Configure a Task¶
>>> from dequindre import Task
>>> pour_tea = Task('./pour_tea.py')
>>> pour_tea
Task(./pour_tea.py)
>>> pour_tea.loc
'./pour_tea.py'
>>> pour_tea.env
'python'
Note that that the python environment defaulted to ‘python’. To use different environments, we’ll need to define them first.
Commons¶
The commons
submodule removes clutter from your config files. Even small
workflows share many parent directories and environments. To this end,
the commons
submodule makes heavy use of context managers for readability.
Common Tasks¶
>>> from dequindre.commons import common_task
>>> common_prefix = '/long/path/to/tea-tasks'
>>> with common_task(common_prefix) as T:
... pour_tea = T(loc='pour_tea.py')
... drink_tea = T(loc='drink_tea.py')
...
>>> pour_tea
Task(/long/path/to/tea-tasks/pour_tea.py)
>>> drink_tea
Task(/long/path/to/tea-tasks/drink_tea.py)
>>> pour_tea.env
'python'
The resulting task definitions are much easier to read.
Common Environments¶
Virtual environments get ugly fast, and they’re best kept out of sight for many users.
>>> from dequindre import Task
>>> from dequindre.commons import common_venv
>>> common_prefix = '/my/very/long/path'
>>> with common_venv(common_prefix) as E:
... tea_env = E('tea-env')
... biscuit_env = E('biscuit-env')
...
>>> tea_env
'/my/very/long/path/tea-env/bin/python'
>>> drink_tea = Task('./drink_tea.py', tea_env)
>>> drink_tea.env
'/my/very/long/path/tea-env/bin/python'
Notice that common_venv
filled in the expected suffix: /bin/python
.
You can override this behavior with the common_suffix
argument.
The same functionality is also supported for pipenv environments and conda
environments through the common_pipenv
and common_conda_env
functions
respectively.
DAGs¶
Configure a DAG¶
>>> from dequindre import Task, DAG
>>> ## define tasks
>>> boil_water = Task('./boil_water.py')
>>> steep_tea = Task('./steep_tea.py')
>>> pour_tea = Task('./pour_tea.py')
>>> make_tea = DAG()
>>> make_tea.add_dependencies({
... steep_tea: boil_water,
... pour_tea: steep_tea
... })
Dequindre Schedulers¶
The Dequindre scheduler is the last major object in dequindre. After defining your tasks and task dependencies in the DAG, you can create a Dequindre scheduler.
>>> from dequindre import Task, DAG, Dequindre
>>> ## define tasks
>>> boil_water = Task('./boil_water.py')
>>> steep_tea = Task('./steep_tea.py')
>>> pour_tea = Task('./pour_tea.py')
>>> make_tea = DAG()
>>> make_tea.add_dependencies({
... steep_tea: boil_water,
... pour_tea: steep_tea
... })
>>> dq = Dequindre(make_tea)
>>> dq.get_schedules()
defaultdict(<class 'set'>, {
1: {Task(boil_water.py)},
2: {Task(steep_tea.py)},
3: {Task(pour_tea.py)}})
>>> dq.run_tasks()
Running Task(./boil_water.py)
I am boiling water...
Running Task(./steep_tea.py)
I am steeping tea...
Running Task(./pour_tea.py)
I am pouring tea...
Error Handling¶
By default, Dequindre uses soft error handling; if one task fails, Dequindre assumes it’s a non-critical error and continues on. But we don’t always want this behavior. Instead, if one task fails, we want the whole schedule to fail.
Dequindre.run_tasks()
has an optional error_handling
method that takes
one of two values: error_handling='soft'
or error_handling='hard'
. The
latter will raise an EarlyAbortError
if any of the tasks fail.
Bringing It All Together¶
After reviewing the cook book so far, we’re ready to write an optimized Dequindre schedule. This works as a good reference when you’re building your first schedules.
from pprint import pprint
from dequindre import Task, DAG, Dequindre
from dequindre.commons import common_task, common_conda_env
def run_schedule():
print('Starting run-my-schedule...')
CONDA_PREFIX = '/opt/conda/envs'
with common_conda_env(CONDA_PREFIX) as conda_env:
python27 = conda_env('python27')
python36 = conda_env('python36')
TASK_PATTERN = '/opt/my-tasks/{}/main.py'
with common_task(TASK_PATTERN, python27) as T:
leave_home = T('leave_home')
get_fuel = T('get_fuel')
get_groceries = T('get_groceries')
with common_task(TASK_PATTERN, python36) as T:
pay_rent = T('pay_rent')
return_home = T('return_home')
make_dinner = T('make_dinner')
go_to_bed = T('go_to_bed')
dag = DAG(tasks={
leave_home, get_fuel, get_groceries,
pay_rent, return_home, make_dinner, go_to_bed
})
dag.add_dependencies({
get_fuel: leave_home,
get_groceries: leave_home,
pay_rent: leave_home,
return_home: {get_fuel, get_groceries, pay_rent},
make_dinner: {return_home, get_groceries},
go_to_bed: make_dinner
})
dq = Dequindre(dag)
schedules = dq.get_schedules()
pprint(schedules)
# {1: {Task(/opt/my-tasks/leave_home/main.py)},
# 2: {Task(/opt/my-tasks/get_fuel/main.py),
# Task(/opt/my-tasks/pay_rent/main.py),
# Task(/opt/my-tasks/get_groceries/main.py)},
# 3: {Task(/opt/my-tasks/return_home/main.py)},
# 4: {Task(/opt/my-tasks/make_dinner/main.py)},
# 5: {Task(/opt/my-tasks/go_to_bed/main.py)}}
dq.run_tasks()
if __name__ == '__main__':
run_schedule()