dalymi

[data like you mean it]

Documentation Status

A lightweight, data-focused and non-opinionated pipeline manager written in and for Python.


dalymi allows to build data processing pipelines as directed acyclic graphs (DAGs) and facilitates rapid, but controlled, model development. The goal is to prototype quickly, but scale to production with ease.
To achieve this, dalymi uses "make"-style workflows, i.e. tasks with missing input trigger the execution of input-producing tasks before being executed themselves. At the same time, dalymi provides fine control to run and undo specific pipeline parts for quick test iterations. This ensures output reproducability and minimizes manual errors.

Several features facilitate dalymi's goal:

  • simple, non-opinionated API (most choices left to user)
  • no external dependencies for pipeline execution
  • one-line installation (ready for use)
  • no configuration
  • auto-generated command line interface for pipeline execution
  • quick start, but high flexibility to customize and extend:
    • task output can be stored in any format Python can touch (local files being the default)
    • customizable command line arguments
    • templated output location (e.g. timestamped files)
    • support for automated checks on data integrity during runtime
  • DAG visualization using graphviz
  • API design encourages good development practices (modular code, defined data schemas, self-documenting code, easy workflow viz, etc.)

In dalymi, DAG nodes (or "tasks") are vanilla Python functions with a specific call signature. Task dependencies (DAG edges) are defined using function decorators. These decorators specify data resources as inputs and outputs of tasks. dalymi automatically builds a DAG by learning the dependencies between tasks based on their data input and output.

To learn more, check out the tutorial below, the recipes, or examples.

Links:

Installation

dalymi requires Python >= 3.5.

pip install dalymi

For the latest development:

pip install git+https://github.com/joschnitzbauer/dalymi.git

DAG visualization requires graphviz.

Tutorial

dalymi programs consist of two main components: resources and tasks. Tasks are Python functions that create resources and/or require resources as input. Hence, a dalymi pipeline can be described as a directed graph of tasks and resources where tasks are dependent on their input resources.

In this tutorial, we will create the following simple DAG:

Simple DAG

Note

The above image was generated by the dalymi graph plot command line interface and graphviz: python examples/simple.py dot followed by dot -Tpng pipeline.dot -o pipeline.png.

In this graph, ellipses and rectangles represent tasks and resources, respectively. The first row of rectangles gives the name of a resource. If the resource is a tabular dataframe (e.g. a pandas.DataFrame), the remaining rows indicate the expected columns in this resource.

Hence, we can see that the task create_numbers produces a resource called numbers which has a single column called number. The numbers resource in turn is being used as an input by the task square_numbers. Finally, square_numbers produces an output resource called squares with columns number and square.

Upon execution of a DAG, dalymi attempts to run each task, but will only execute them if the respective input resources exist. If an input resource does not exist, dalymi will recursively attempt to execute the producing task and revisit the unexecuted task later.

The full code to create the above pipeline is:

from dalymi import Pipeline
from dalymi.resources import PandasCSV
import pandas as pd


# Define resources:
numbers_resource = PandasCSV(name='numbers', loc='numbers.csv', columns=['number'])
squares_resource = PandasCSV(name='squares', loc='squares.csv', columns=['number', 'square'])


# Define the pipeline
pl = Pipeline()


@pl.output(numbers_resource)
def create_numbers(**context):
    return pd.DataFrame({'number': range(11)})


@pl.output(squares_resource)
@pl.input(numbers_resource)
def square_numbers(numbers, **context):
    numbers['square'] = numbers['number']**2
    return numbers


if __name__ == '__main__':
    # Run the default command line interface
    pl.cli()

Note

Further examples can be found on Github.

Let's go through the code bit by bit. First, the imports:

from dalymi import Pipeline
from dalymi.resources import PandasCSV
import pandas as pd

We are using a Pipeline object to represent the whole workflow and instances of PandasCSV for Pandas dataframe resources stored as CSV files. We also import pandas itself for later use.

Next, we define the resources in our pipeline:

numbers_resource = PandasCSV(name='numbers', loc='numbers.csv', columns=['number'])
squares_resource = PandasCSV(name='squares', loc='squares.csv', columns=['number', 'square'])

Here, we specify a unique name for each resource, the location where they should be stored (loc) and the columns we expect (columns).

Note

Column names are not required to define dataframe resources, but it is highly encouraged for disciplined coding and self-documenting code. The integrated dot command can visualize the columns in each resource when specified, allowing for quick and easy documentation.

Now, we are ready to define the pipeline tasks:

pl = Pipeline()


@pl.output(numbers_resource)
def create_numbers(**context):
    return pd.DataFrame({'number': range(11)})


@pl.output(squares_resource)
@pl.input(numbers_resource)
def square_numbers(numbers, **context):
    numbers['square'] = numbers['number']**2
    return numbers

Pipeline tasks are simple Python functions (create_numbers and square_numbers), decorated with either Pipeline.input, Pipeline.output or both.
Positional arguments of the output decorator are matched to the returned objects of the task function. For example: in create_numbers, the returned dataframe is interpreted as the numbers_resource. The output decorator checks the returned objects for data integrity (in this case: "do the columns match the expected?") and stores the object at the resource location (in this case: as a flat file called numbers.csv).
Positional arguments of the input decorator are loaded from the resource location and injected into a context dictionary with the resource name as key and the data object as value. Hence, each pipeline task must accept a context dictionary as keyword arguments. Adding positional arguments to an input-receiving pipeline function is a handy Python trick to extract the arguments elegantly from context (e.g. numbers is directly pulled from context in the definition of the square_numbers function).

Warning

An output decorator must wrap a potential input decorator to ensure correct pipeline functionality.

dalymi provides an easy way to create command-line interfaces (CLIs) for pipelines. The default CLI is added to the above pipeline like this:

if __name__ == '__main__':
    # Run the default command line interface
    pl.cli()

Now, we can interact with the pipeline in the following way (assuming that above code is saved in a single file simple.py):

python simple.py run     # executes the pipeline. skips tasks for which output already exists.
python simple.py undo    # deletes any existing output resources of the pipeline.
python simple.py dot     # creates a dot file to create pipeline diagrams with graphviz.

Task execution can be controlled a bit finer with CLI arguments. For example:

python simple.py run --task square_numbers    # executes only `square_numbers` (and upstream requirements if needed).
python simple.py run -t square_numbers        # short for the line above
python simple.py ls                           # lists all tasks in the pipeline
python simple.py undo --task square_numbers   # deletes only the output of `square_numbers`
python simple.py undo -t square_numbers       # short for the line above
python simple.py undo -t square_numbers -d    # deletes the output of `square_numbers` and all downstream outputs
python simple.py undo -t square_numbers --downstream    # same as above

CLI commands and arguments are added to the context dictionary, so pipeline tasks can make use of them. Given the default CLI, the minimal context is:

{'command': '{command}'}  # where {command} is one of `run`, `undo`, `ls`, `dot`

Depending on optional arguments, context may contain other CLI entries (by the argument's long version), such as:

{'task': 'square_numbers',
 'downstream': True}

Note

Extending the CLI with custom arguments allows for easy implementation of project-specific arguments (for example the execution date). Custom arguments will be automatically added to the context, so that they are available for all pipeline functions. Moreover, resource locations can be specified as templates which are filled with context variables during runtime. This allows for argument-specific resource storage. See Recipes for further details.