When it comes to data science development, a Jupyter Notebook is often the easiest way to go. When the time comes to convert development code into a production pipeline, however, getting a notebook production-ready can be tricky!
Often, a notebook contains a lot of exploratory code that shouldn’t go into the pipeline – in practice, this can be 80% or more of the notebook! Additionally, it can be difficult to collect the final dependencies that are needed for a particular pipeline once the code has been cleaned up. It can also be difficult to unravel the code into the discrete steps that a pipeline depends on. Finally, manually converting the discrete steps into components in a data pipeline requires familiarity with an orchestrator like AirFlow, which can take months to learn!
With LineaPy, you can automate the process of taking a notebook and turning it into a data pipeline, speeding up the production process and removing human error. Instead of manually setting up your pipeline by cleaning up your code, identifying dependencies, and building an Airflow pipeline, you can let LineaPy take care of that for you.
In this article, we’ll cover how you can use LineaPy to auto-generate data pipelines, as well as how it works behind the scenes!
Let’s start by looking at what auto-generating a data pipeline with LineaPy looks like in practice! In this example, we’ll work with the Iris dataset to generate two pipeline results, or artifacts, each of which shares some code in common.
import lineapy
import pandas as pd
# Let’s start by importing any necessary Python libraries and then reading in the Iris dataset:
lineapy.tag("Data Pipeline Tutorial")
# Load data
df = pd.read_csv("https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv")
In the next block of code, we’ll calculate the differences between the length of the sepal and petal for
each flower, followed by the differences between the width. Then, we’ll calculate the sum between
the same measurements. Finally, we’ll print out the head of each of these outputs, so we can
spot-check our data
# Calculate the differences between sepal and petal measurements
length_diff = df['sepal.length'] - df['petal.length']
width_diff = df['sepal.width'] - df['petal.width']
length_diff.head()
width_diff.head()
# Calculate the sum of sepal and petal measurements
length_sum = df['sepal.length'] + df['petal.length']
width_sum = df['sepal.width'] + df['petal.width']
length_sum.head()
width_sum.head()
After examining our data, we decide the differences between sepal and petal measurements are useful,
so we add this data to our DataFrame.
# Assign the length and width differences to the DataFrame
df['length_diff'] = length_diff
df['width_diff'] = width_diff
We decide to proceed no further with the sum data, however, making this code irrelevant for the rest of
the pipeline. We don’t need to remove it from our notebook, however – LineaPy will take care of that
for us when we produce our pipeline!
For our final outputs, we decide we’d like to generate the average difference in lengths by flower types,
as well as the average difference in widths:
# Calculate average difference between sepal and petal length for all flower types
avg_length_diff_setosa = df.query("variety == 'Setosa'")["length_diff"].mean()
avg_length_diff_virginica = df.query("variety == 'Virginica'")["length_diff"].mean()
avg_length_diff_versicolor = df.query("variety == 'Versicolor'")["length_diff"].mean()
avg_length_diff = {
'Setosa': avg_length_diff_setosa,
'Versicolor': avg_length_diff_versicolor,
'Virginica': avg_length_diff_virginica
}
# Calculate average difference between sepal and petal width for all flower types
avg_width_diff_setosa = df.query("variety == 'Setosa'")["width_diff"].mean()
avg_width_diff_virginica = df.query("variety == 'Virginica'")["width_diff"].mean()
avg_width_diff_versicolor= df.query("variety == 'Versicolor'")["width_diff"].mean()
avg_width_diff = {
'Setosa': avg_width_diff_setosa,
'Versicolor': avg_width_diff_versicolor,
'Virginica': avg_width_diff_virginica
}
From the workflow above, we have created two important statistics, avg_length_diff
and avg_width_diff
, and we want to save them as LineaPy artifacts. To do so, we simply invoke the save
API:
lineapy.save(avg_length_diff, 'avg_length_diff')
lineapy.save(avg_width_diff, 'avg_width_diff')
The above code will produce two LineaArtifacts named 'avg_length_diff'
and 'avg_width_diff'
. Now, we’re ready to auto-generate a pipeline for creating those artifacts. This is a simple task that we can accomplish in just one line of code:
lineapy.to_pipeline(['avg_length_diff', 'avg_width_diff'])
When we generate our pipeline with the API call lineapy.to_pipeline()
, the call will produce three output files, a Python file containing the cleaned up code, a text file containing the requirements, and a Dockerfile for building the pipeline. The output will look something like this:
Generated module file: avg_length_diff_avg_width_diff_module.py
Generated requirements file: avg_length_diff_avg_width_diff_requirements.txt
Generated Docker file: avg_length_diff_avg_width_diff_Dockerfile
PosixPath('.')
So now that we have these output files, let’s look deeper into what each file contains!
This is the Python file that contains all the cleaned up Python code that the pipeline generation produces. It includes the minimum code needed to generate the saved artifacts – It removes any irrelevant code, such as the code that generates the sums of the petal and sepal measurements above, and then modularizes that code into individual tasks.
In this example, the file contains the following code:
import pandas as pd
def get_df_for_artifact_avg_width_diff_and_downstream():
df = pd.read_csv(
"https://raw.githubusercontent.com/LineaLabs/lineapy/main/examples/tutorials/data/iris.csv"
)
length_diff = df["sepal.length"] - df["petal.length"]
width_diff = df["sepal.width"] - df["petal.width"]
df["length_diff"] = length_diff
df["width_diff"] = width_diff
return df
def get_avg_width_diff(df):
avg_width_diff_setosa = df.query("variety == 'Setosa'")["width_diff"].mean()
avg_width_diff_virginica = df.query("variety == 'Virginica'")["width_diff"].mean()
avg_width_diff_versicolor = df.query("variety == 'Versicolor'")["width_diff"].mean()
avg_width_diff = {
"Setosa": avg_width_diff_setosa,
"Versicolor": avg_width_diff_versicolor,
"Virginica": avg_width_diff_virginica,
}
return avg_width_diff
def get_avg_length_diff(df):
avg_length_diff_setosa = df.query("variety == 'Setosa'")["length_diff"].mean()
avg_length_diff_virginica = df.query("variety == 'Virginica'")["length_diff"].mean()
avg_length_diff_versicolor = df.query("variety == 'Versicolor'")[
"length_diff"
].mean()
avg_length_diff = {
"Setosa": avg_length_diff_setosa,
"Versicolor": avg_length_diff_versicolor,
"Virginica": avg_length_diff_virginica,
}
return avg_length_diff
def run_session_including_avg_width_diff():
# Given multiple artifacts, we need to save each right after
# its calculation to protect from any irrelevant downstream
# mutations (e.g., inside other artifact calculations)
import copy
artifacts = dict()
df = get_df_for_artifact_avg_width_diff_and_downstream()
avg_width_diff = get_avg_width_diff(df)
artifacts["avg_width_diff"] = copy.deepcopy(avg_width_diff)
avg_length_diff = get_avg_length_diff(df)
artifacts["avg_length_diff"] = copy.deepcopy(avg_length_diff)
return artifacts
def run_all_sessions():
artifacts = dict()
artifacts.update(run_session_including_avg_width_diff())
return artifacts
if __name__ == "__main__":
# Edit this section to customize the behavior of artifacts
artifacts = run_all_sessions()
print(artifacts)
This is the requirements text file that lists the minimal set of Python libraries and versions needed at runtime so that the environment can be configured properly at runtime. With this file, we can ensure that the pipeline runs correctly on a different machine.
In this example, the requirements file is simply
pandas==1.3.5
This is the Dockerfile that is responsible for building the pipeline as a whole. It ties everything together, using the tasks and dependencies that LineaPy has identified to build the graph within the target pipeline framework. The Dockerfile configures the data engineering pipeline and installs all the dependencies listed in the requirements document.
In this example, the Dockerfile contains the following code:
FROM python:3.8
RUN mkdir /tmp/installers
WORKDIR /tmp/installers
# Copy all the requirements to run current DAG
COPY ./avg_length_diff_avg_width_diff_requirements.txt ./
# Install required libs
RUN pip install -r ./avg_length_diff_avg_width_diff_requirements.txt
WORKDIR /home
COPY ./avg_length_diff_avg_width_diff_module.py ./
ENTRYPOINT [ "python", "/home/avg_length_diff_avg_width_diff_module.py" ]
Under the hood, LineaPy simplifies task execution by converting the code into a graph. After LineaPy creates the graph, it then segments the graph into individual tasks. LineaPy walks through the graph and extracts common components required for each, making it possible to isolate the tasks required to generate each specific artifact. You can visualize this process with the following image:
Since the target artifacts A1 and A2 require some of the same code, the `lineapy.to_pipeline()` call extracts that repetitive code. This code is then stored as individual tasks that can each be executed independently.
After we create the LineaPy graph for a pipeline, we can easily view it using the `visualize()` API. For example, let’s look at the graph generated from a code snippet that involves training a regression model:
In this diagram, we can see the relationships between the arguments, functions, and libraries used in this code. At the top, we can see the process of reading in the training data, ‘data/sample_train_data.csv`, and toward the bottom, we can see that it’s used to train a `LogisticRegression` model from the `sklearn.linear_model` library.
For a detailed overview of the graph generation process, you can check out a walkthrough here.
Rather than functioning entirely independently, tasks often rely on taking in input from other tasks. To allow for this, LineaPy also makes it possible to pass information between tasks in a few different ways.
First, data dependencies between tasks can be stored in memory, and then passed as input arguments to the given downstream task. You can see an example of this in the code snippet below, where multiple functions dependent on the trained model mod
:
def get_df():
df = load_and_preprocess_data(type=”train”)
return df
def get_mod():
mod = train_model(df)
return mod
def get_pred(mod):
df_new = load_and_preprocess_data(type=”predict”)
pred = make_predictions(mod, df_new)
return pred
When we don’t want to store arguments in memory, we can also pass data to the task from the disk. We can write the output of a task to disk, and then read the values back into memory when needed for a downstream task. This approach might look like this:
def task_get_df():
df = get_df()
pickle.dump(df, picklefile_df)
def task_get_mod():
df = pickle.load(picklefile_df)
mod = get_mod(df)
pickle.dump(mod, picklefile_mod)
def task_get_pred():
mod = pickle.load(picklefile_mod)
pred = get_pred(mod)
pickle.dump(pred, picklefile_pred)
Because this graph structure makes it easy to pass arguments to a task as needed, it also makes it simple to swap out graph parameters. To accomplish this, you can specify the name of the variable to be made into a parameter, and then create a pipeline that accepts that value. If you’re interested in learning more about graph parameters, we describe it in more detail in the documentation.
As you’ve seen, LineaPy is designed to ensure that the task and dependency extraction pieces are not specific to any framework. This makes it easy to leverage LineaPy’s ability to modularize code and create graphs across various frameworks. While we’ve primarily focused on AirFlow here, we’ve also recently added support for Ray, Kubeflow, Argo, and DVC integrations!If there are more integrations that you’d like to see, you can also build them yourself! For more information on how to build custom integrations, check out our contributor guide as well as our guide to writing custom integrations.