In this post, we are going to demonstrate how to use LineaPy to build a small app that tells us whether we should refill our gas tank or not based on the analysis of Sean J. Taylor in Forecasting gas prices demo (disclosure: he’s a LineaPy advisor).
The application should include two components:
Since the data frequency is weekly, the ETL job should be scheduled weekly as well.
If the forecast price goes up, we should fill our tank right now; if the price goes down, we should try to wait until next week.
We will use lineapy.to_pipeline
to generate a pipeline in both cases and demonstrate how to customize the pipeline based on our goal and utilize the refactored functions included in the python module created by LineaPy.
First, we are going to follow the (exact) steps in the original blog post to read the weekly gas price from EIA. Then we will perform a transformation on the raw data for the downstream process, save the transformed data as a LineaPy artifact, and use lineapy.to_pipeline
to create an Airflow DAG that updates the weekly price. Finally, we will demonstrate how to customize the Airflow DAG to save the artifact to the desired location.
# timeseries_app.ipynb
%load_ext lineapy
import requests
import pandas as pd
import numpy as np
import re
# Download from EIA and transform
response = requests.get("https://www.eia.gov/petroleum/gasdiesel/xls/pswrgvwall.xls")
df = pd.read_excel(
response.content,
sheet_name="Data 12",
index_col=0,
skiprows=2,
parse_dates=["Date"],
).rename(
columns=lambda c: re.sub(
"\(PADD 1[A-C]\)",
"",
c.replace("Weekly ", "").replace(
" All Grades All Formulations Retail Gasoline Prices (Dollars per Gallon)",
"",
),
).strip()
)
df_long = (
df.reset_index()
.melt(id_vars=["Date"], var_name="region", value_name="price")
.rename(columns={"Date": "week"})
.sort_values(["region", "week"])
.assign(
# if we're missing one value, just use the last value
# (happens twice)
price=lambda x: x["price"].combine_first(x.groupby("region")["price"].shift(1)),
# we'll forecast log(price) and then transform
log_price=lambda x: np.log(x["price"]),
# percentage price changes are approximately the difference in log(price)
price_change=lambda x: (
x["log_price"] - x.groupby("region")["log_price"].shift(1)
),
)
.query("price == price") # filter out NAs
)
# Save the transformed data as an artifact
lineapy.save(df_long, 'weekly_gas_price_data')
# Create an Airflow Pipeline that is scheduled weekly
lineapy.to_pipeline(['weekly_gas_price_data'], framework='AIRFLOW', pipeline_dag_config={'schedule_interval':'@weekly'})
# In case you just want to see the final app without updating the weekly data
df_long.to_csv('weekly_gas_price_data_long.csv', index=False)
We can inspect the Airflow DAG file generated by LineaPy as follows:
# weekly_gas_price_data_dag.py
import pathlib
import pickle
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
import weekly_gas_price_data_module
def dag_setup():
pickle_folder = pathlib.Path("/tmp").joinpath("weekly_gas_price_data")
if not pickle_folder.exists():
pickle_folder.mkdir()
def dag_teardown():
pickle_files = (
pathlib.Path("/tmp").joinpath("weekly_gas_price_data").glob("*.pickle")
)
for f in pickle_files:
f.unlink()
def task_weekly_gas_price_data():
df_long = weekly_gas_price_data_module.get_weekly_gas_price_data()
pickle.dump(
df_long, open("/tmp/weekly_gas_price_data/variable_df_long.pickle", "wb")
)
default_dag_args = {
"owner": "airflow",
"retries": 2,
"start_date": days_ago(1),
}
with DAG(
dag_id="weekly_gas_price_data_dag",
schedule_interval="@weekly",
max_active_runs=1,
catchup=False,
default_args=default_dag_args,
) as dag:
setup = PythonOperator(
task_id="dag_setup",
python_callable=dag_setup,
)
teardown = PythonOperator(
task_id="dag_teardown",
python_callable=dag_teardown,
)
weekly_gas_price_data = PythonOperator(
task_id="weekly_gas_price_data_task",
python_callable=task_weekly_gas_price_data,
)
setup >> weekly_gas_price_data
weekly_gas_price_data >> teardown
Note that, by default, the pipeline generated by LineaPy does not serialize the artifact object to disk or database. Fortunately, we can easily modify the DAG file to decide where we want to save the artifact (weekly gas price data). To keep the demo simple, we are going to save our artifact as a CSV file on disk as follows:
def task_weekly_gas_price_data():
df_long = weekly_gas_price_data_module.get_weekly_gas_price_data()
pickle.dump(
df_long, open("/tmp/weekly_gas_price_data/variable_df_long.pickle", "wb")
)
df_long.to_csv('weekly_gas_price_data_long.csv', index=False)
Now, we’ve set up an Airflow job that updates the gas price weekly.
So far, we’ve already set up a weekly job that updates the weekly gas price. In this section, we are going to use LineaPy to build a FastAPI app (as an API) that tell us whether we should refill our gas tank or not based on our geo location.
First, we will read the pre-processed data from the weekly data transformation pipeline and follow the steps in the original blog post to forecast gas prices for a given region. Then, we will save the difference between the forecasted price and the current price as an artifact and use lineapy.to_pipeline
to generate a parameterized pipeline that takes the region as input parameters. Finally, we will write a FastAPI app using the python module created by lineapy.to_pipeline
.
Note that our FastAPI app should include the following two endpoints:
We can continue the flow of Sean’s blog post to train a model to forecast the gas price in a certain region and calculate the metric we are interested in to determine whether we should fill our tank or not.
# timeseries_app.ipynb
%load_ext lineapy
import pandas as pd
import numpy as np
from statsforecast.models import AutoARIMA
# Read csv file updated by the Airflow pipeline
df_long = pd.read_csv('weekly_gas_price_data_long.csv', parse_dates=["week"])
# Save all available regions as an artifact
all_regions = df_long["region"].unique().tolist()
lineapy.save(all_regions, "all_regions")
# Forecast future gas price for a region
H = 13
CI = 80
width = 300
height = 250
region = "U.S."
cutoff_date = pd.Timestamp.today().strftime("%Y-%m-%d")
region_df = df_long.query(f"region == '{region}'")
train = region_df.query(f"week <= '{cutoff_date}'")
m_aa = AutoARIMA()
m_aa.fit(train["log_price"].values)
raw_forecast = m_aa.predict(h=H, level=(CI,))
raw_forecast_exp = {key: np.exp(value) for key, value in raw_forecast.items()}
forecast = pd.DataFrame(raw_forecast_exp).assign(
week=pd.date_range(train["week"].max(), periods=H, freq="W")
+ pd.Timedelta("7 days")
)
# Calcualte the price difference and save it as an artifact
current_price = train.price.iloc[-1]
forecast_price = forecast['mean'].iloc[0]
price_change = forecast_price-current_price
lineapy.save(price_change, 'price_change')
Since lineapy.to_pipeline
is leveraging a generated module to create a pipeline, we can actually use the same module for other purposes. Here, we are going to use it to build an app.
# Create a parameterized pipeline
lineapy.to_pipeline(['price_change','all_regions'], input_parameters=['region'])
# price_change_all_regions_module.py
import argparse
import numpy as np
import pandas as pd
from statsforecast.models import AutoARIMA
def get_df_long_for_artifact_all_regions_and_downstream():
df_long = pd.read_csv("weekly_gas_price_data_long.csv", parse_dates=["week"])
return df_long
def get_all_regions(df_long):
all_regions = df_long["region"].unique().tolist()
return all_regions
def get_price_change(df_long, region):
H = 13
CI = 80
cutoff_date = pd.Timestamp.today().strftime("%Y-%m-%d")
region_df = df_long.query(f"region == '{region}'")
train = region_df.query(f"week <= '{cutoff_date}'")
m_aa = AutoARIMA()
m_aa.fit(train["log_price"].values)
raw_forecast = m_aa.predict(h=H, level=(CI,))
raw_forecast_exp = {key: np.exp(value) for key, value in raw_forecast.items()}
forecast = pd.DataFrame(raw_forecast_exp).assign(
week=pd.date_range(train["week"].max(), periods=H, freq="W")
+ pd.Timedelta("7 days")
)
current_price = train.price.iloc[-1]
forecast_price = forecast["mean"].iloc[0]
price_change = forecast_price - current_price
return price_change
def run_session_including_all_regions(region="U.S."):
# Given multiple artifacts, we need to save each right after
# its calculation to protect from any irrelevant downstream
# mutations (e.g., inside other artifact calculations)
import copy
artifacts = dict()
df_long = get_df_long_for_artifact_all_regions_and_downstream()
all_regions = get_all_regions(df_long)
artifacts["all_regions"] = copy.deepcopy(all_regions)
price_change = get_price_change(df_long, region)
artifacts["price_change"] = copy.deepcopy(price_change)
return artifacts
def run_all_sessions(
region="U.S.",
):
artifacts = dict()
artifacts.update(run_session_including_all_regions(region))
return artifacts
if __name__ == "__main__":
# Edit this section to customize the behavior of artifacts
parser = argparse.ArgumentParser()
parser.add_argument("--region", type=str, default="U.S.")
args = parser.parse_args()
artifacts = run_all_sessions(
region=args.region,
)
print(artifacts)
With the python module created by LineaPy, we no longer need to manually extract a python module from a Jupyter Notebook. Since all the core analytic logic is already captured in the python module, we only need to focus on the API design and make the entire process very easy as follows:
# main.py
from fastapi import FastAPI
from price_change_all_regions_module import get_all_regions
from price_change_all_regions_module import get_price_change
from price_change_all_regions_module import get_df_long_for_artifact_all_regions_and_downstream
app = FastAPI()
@app.get("/regions/")
def all_regions():
df_long = get_df_long_for_artifact_all_regions_and_downstream()
all_regions = get_all_regions(df_long)
return '|'.join(all_regions)
@app.get("/{region}/pricechange/")
def price_change(region):
df_long = get_df_long_for_artifact_all_regions_and_downstream()
price_change = get_price_change(df_long, region)
return f"Gas price in {region} is expected to change {price_change:.2f} next week."
We can start the app we just built by running the following:
uvicorn main:app --reload
Then we can go https://localhost:8000/regions/
to see the list of available regions. Or we go https://localhost:8000/Chicago/pricechange/
to see the forecast price change (assume I’m in Chicago). You can build your apps on top of this, ex: the image at the top of this page is a heat-map generated from the predictions made by this app.
From a prototyping Jupyter notebook to an application usually involves extracting the analytic or business logic from a notebook and refactoring them into reusable components that can be used for applications. This process usually pushes either data scientists or engineers out of their comfort zone to do something they are not familiar with.
Thus, the iteration process is usually slow and becomes a pain point for either side. Since LineaPy can programmatically extract this logic as reusable components into a python module. This will allow both data scientists and engineers to focus on what they are good at and improve the overall efficiency from prototyping ideas to actionable insights.