Dequindre Module¶
Task, DAG, and Dequindre classes
This module defines the Task, DAG, and Dequindre classes. Tasks are intended to hold task-level data. DAGs are intended to hold relationships between Tasks. Dequindre schedules Tasks in accordance with the DAG.
-
class
dequindre.
DAG
(*, tasks: set = None, dependencies: dict = None)¶ Bases:
object
Define a DAG and relationships between tasks.
A DAG is a directed acyclic graph with tasks and dependencies as nodes and directed edges respectively. You have the option to define all the tasks and dependencies at once if you prefer that syntax.
Note
Not obviously, a DAG may contain more than one graph. Also not obviously, new Tasks defined by edges are automatically added to the set of tasks.
-
tasks
¶ set of Task – The set of all tasks. Dequindre will try to run every task in this attribute.
-
_edges (`dict` of `Task`
set of Task): A dict of directed edges from one Task to a set of Tasks. Access directly at your own peril.
-
add_dependencies
(d: Dict[dequindre.Task, Set[dequindre.Task]]) → None¶ Add multiple dependencies to DAG
Parameters: d (`dict` of `Task` – set of Task): An adjacency dict mapping downstream Tasks to possibly many upstream tasks. Note
If any tasks do not yet exist in DAG, the task will automatically be added to the dag.
Examples
>>> from dequindre import Task, DAG >>> boil_water = Task('boil_water.py') >>> prep_infuser = Task('prep_infuser.py') >>> steep_tea = Task('steep_tea.py') >>> dag = DAG() >>> dag.add_dependencies({steep_tea: {boil_water, prep_infuser}})
-
add_dependency
(task: dequindre.Task, depends_on: dequindre.Task) → None¶ Add dependency to DAG.
Parameters: - task (Task) – The downstream task.
- depends_on (Task) – The upstream task.
Note
If either task does not yet exist in DAG, the task will automatically be added to the dag.
Examples
>>> from dequindre import Task, DAG >>> boil_water = Task('boil_water.py') >>> steep_tea = Task('steep_tea.py') >>> dag = DAG() >>> dag.add_dependency(steep_tea, depends_on=boil_water)
-
add_task
(task: dequindre.Task) → None¶ Add a task to the set of tasks
Parameters: task (Task) – A Task object.
-
add_tasks
(tasks: set) → None¶ Add multiple tasks to the set of tasks.
Parameters: tasks (set of Task) – Tasks to be added to the DAG.
-
get_downstream
() → dict¶ Return adjacency dict of downstream Tasks.
Returns: set of Task Return type: dict of Task
-
get_sinks
() → set¶ Return the set of sink Tasks (Tasks with no downstream dependencies)
Returns: set of Task
-
get_sources
() → set¶ Return the set of source Tasks (Tasks with no upstream dependencies)
Returns: set of Task
-
get_upstream
() → dict¶ Return adjacency dict of upstream Tasks
Returns: set of Task Return type: dict of Task
-
is_cyclic
() → bool¶ Detect if the DAG is cyclic.
Returns: True if cycle detected. False otherwise.
-
remove_task
(task: dequindre.Task) → None¶ Remove task from the set of tasks and remove any related edges
Parameters: task (Task) – A task to be removed from the DAG.
-
remove_tasks
(tasks: set) → None¶ Remove multiple tasks from the set of tasks and any related edges
Parameters: tasks (set of Task) – Tasks to be removed from the DAG.
-
-
class
dequindre.
Dequindre
(dag: dequindre.DAG)¶ Bases:
object
The Dequindre scheduler handles all the scheduling computations.
-
dag
¶ DAG – A copy of the originally supplied DAG. This attribute is trimmed while planning the schedule.
-
original_dag
¶ DAG – The originally supplied DAG. Used to refresh dag after the schedule is planned.
-
get_schedules
() → Dict[int, Set[dequindre.Task]]¶ Schedule tasks by priority level.
Returns: set of Task Return type: dict of int - For example, make_tea -> pour_tea -> drink_tea will give the dict
- {1: {make_tea},
- 2: {pour_tea}, 3: {drink_tea}}
-
get_task_schedules
() → Dict[dequindre.Task, int]¶ Define schedule priority level for each task
Returns: int Return type: dict of Task Example
make_tea -> pour_tea -> drink_tea will give the dict {
make_tea: 1, pour_tea: 2, drink_tea: 3}
-
refresh_dag
() → None¶ Create a deepcopy of the original_dag.
-
run_task
(task: dequindre.Task) → None¶ Run the python file defined by Task.loc in the environment defined by the Task.env
Parameters: task (Task) – The task to be run.
-
run_tasks
(error_handling: str = 'soft') → None¶ Run all tasks on the DAG.
Parameters: error_handling (str) – Either ‘soft’ or ‘hard’. ‘hard’ error handling will abort the schedule after the first error.
-