Customized Luigi Pipeline

To make better business decisions in the garden market, Lady H. needs periodic estimates of perfume sales. Whether it's about determining the quantity of flowers to harvest, planning marketing campaigns, introducing innovations in perfume design, or other matters, having reliable sales estimates is essential. To meet these needs, she developed the Luigi pipeline.

  • To forecast future sales, the process often go through Data Collection -> Feature Engineering -> Data Preprocessing -> Model Selection -> Model Evaluation, each step is called as a Luigi "task".

  • Data drift monitoring involves detecting abnormal changes in the data. The causes of data drift can vary, but they generally reduce forecasting accuracy. When data drift is detected, it's important to investigate further to identify the underlying reasons.

In this Luigi pipeline,

  • The configurable parameters for each task are defined in the config file. Users can modify these parameters to adjust the pipeline as needed.

  • Functions in helpers file can be shared across multiple Luigi tasks.

  • It is a good practice to include both unit tests and integration tests for the pipeline. Unit tests evaluate individual functions, while integration tests assess how multiple functions work together. Whenever the pipeline code is modified, these tests help ensure that the changes do not disrupt other parts of the code.

The command center is run.py, once Lady H. decides to run either forecasting or data drift monitoring, she just needs to hit the button and the pipeline will be executed automatically.

Run Luigi Pipeline

Let's start by reviewing the code in run.py. Here, users specify which task or pipeline to execute. To make the configuration parameters accessible to all Luigi tasks, you need to load the config file and pass it as a task parameter, like the config shown in the code. Additionally, the task specified here only needs to be the final task of the pipeline, like ModelEvaluationTask in this example, this is because Luigi manages task dependencies, it can automatically trace back to previous tasks from the final one.

๐ŸŒป Check run.py config >>

๐ŸŒป Check run.py code >>

Task - Data Collection

This is the first task of the forecasting pipeline, the purpose is to merge all the stores' data and the sales data into 1 file and save it for later tasks.

  1. In Luigi, every task is defined as a class that inherits from luigi.Task.

  2. Config file is accessible to Luigi tasks, since it's been passed as a task parameter in run.py.

  3. Each Luigi task has an output() function to define the location of the output. This is helpful when there is an interruption of the execution, then luigi can re-run from where it stopped.

  1. Each task has a run() function to define the core logic.

  2. Since the output() function doesn't automatically save the output, users need to save the output through run() function.

๐ŸŒป Check data_collection config >>

๐ŸŒป Check data_collection.py code >>

Task - Feature Engineering

Feature engineering is a creative process. In addition to using existing data columns as features, users can generate new features to enhance forecasting performance. This process of creating new features is known as "feature engineering".

To create new features effectively, it's often worthwhile to first explore the data for deeper insights. Here are some basic methods that Lady H. frequently uses during her data exploration.

Data Exploration - Univariate Analysis

Univariate analysis is to look at the statistics of each single data column, such as checking feature distribution and target distribution. This type of analysis provides an overall view of the data. For example:

We can examine the distribution of the forecast target. From the sales plot, we observe that some perfume sales are zero. Additionally, a large portion of sales falls within the range of [100, 10,000], with the distribution exhibiting a long tail of high-value sales. This provides a starting point for further exploration to uncover potential underlying factors, which could help feature engineering.

The distributions of categorical feature can be plotted as bar charts, so that we can see the comparison between all the values of a feature. Such as in feature "StoreType", type 1 occupies a much smaller population than other store types; in feature "Month", there is less records between August and December, comparing with other months.

It's common to see the distributions of numerical features look like skewed normal distribution such as "Customers" distribution below, or look like skewed normal distribution with bumps such as "CompetitionDistance" distribution below. It is also common to see a very long tail in these distributions, and we can try out binning the feature values, so that a numerical feature will be converted to a categorical feature.

๐ŸŒป Check data exploration details >>

Data Exploration - Bivariate Analysis

Bivariate analysis explores the relationship between two variables, such as the relationship between two features or between a feature and the target. During feature engineering, Lady H. typically starts by examining the distribution of each feature against the target. If a feature's values can effectively distinguish between different target values, it is more likely to enhance the forecast.

For example, from the univariate analysis above, we are seeing the distribution of "Customers" is showing a large bump before 3000 and a long tail after 3000, then what does the sales distributions look like for "Customers < 3000" and "Customers >= 3000"? The answer is shown below, and there is an obvious sales difference between these 2 groups. When the number of customers are larger than 3000, there is higher sales. Therefore, we can create a new feature that simply divides feature "Customers" into 2 bins, "Customers < 3000" and "Customers >= 3000".

We can also look at sales distributions for categorical features, such as the sales of StoreType 1 appears to be more different from other store types, so that we can create a new feature to indicate whether the StoreType is 1 or not; the sales distribution of each year looks quite similar, indicating feature "Year" may not be an important feature.

๐ŸŒป For more insights, check the full data exploration details >>

Feature Engineering Pipeline Code

In the pipeline, we decide which new features to add through feature_adding_dct in the config file. Then feature engineering task will call respective functions from the helpers file.

In this example, Lady H. was adding 2 functions:

  • add_date_feature() is to add Year, Month, Quarter as new features, they were all generated from the "date" column. Although "Year" may not be a good feature as we saw in above bivariate analysis, it is common to generate these time elements as new features when we have a date column.

  • add_threshold_grouping_features() is to generate a binary feature based on specified thereshold. In this case, feature "Customers" was used to generate feature "Customers_larger_then_3000", which indicates whether the customers amount is larger than 3000.

๐ŸŒป Check feature engineering config >>

๐ŸŒป Check feature engineering task >>

๐ŸŒป Check feature engineering helpers >>

You might have noticed function requires() in feature engineering task. It builds the dependency between tasks. Because feature engineering can only be executed after finishing data collection task, by indicating this relationship through requires(), Luigi will know the order of the tasks' execution.

Task - Data Preprocessing

This step allows you to perform any data operations needed to ensure the dataset is ready for model training.

In Lady H.'s case, she simply needed to convert certain features to the "category" type. This is because models like LightGBM (LGBM) can automatically handle categorical features when they are specified as such.

  • le_col refers to a feature containing a mix of numerical and string values. Since models cannot interpret this data type correctly, it would cause errors. Therefore, Lady H. first applied label encoding to convert the values into integers. She later converted the feature to "category" type so that the values won't bring order to the model.

  • int_cat_col refers to a categorical feature initially represented as integers. These can be directly converted to the "category" type without additional processing.

๐ŸŒป Check data preprocessing config >>

๐ŸŒป Check data preprocessing task >>

Task - Model Selection

Itโ€™s time to fit the model using our preprocessed data. In an automated machine learning pipeline, itโ€™s beneficial to enable model selection so the pipeline can adapt and choose the optimal model as the dataset evolves. As demonstrated in mini pipelines, MLJar is an effective tool for model selection. Therefore, for the model selection task here, you can create a configurable MLJar AutoML instance to fit the data.

๐ŸŒป Check model selection config >>

๐ŸŒป Check model selection task >>

Task - Model Evaluation

After fitting the selected model with the training data, time to evaluate the model performance on the testing data.

In this model evaluation task, R2 score is expected to be used for this regression problem. At the same time, Lady H. has generated the lower bound and the upper bound of the model confidence interval. But what is "model confidence interval"?

The confidence interval (CI) of a model reflects how confident we are that the testing performance score represents the model's true performance.

For example, Lady H. applied model CI on this regression problem and got the value range between [0.936, 0.939], the sepcified confidence level is 95%, indicating that, if the testing performance (R2 score in this case) is within this value range, then there is 95% likelihood that the model performance is true.

Similarly, in a classification example below, if the testing model performance (balanced accuracy in this case) is within [0.790, 0.907], then there is 95% likelihood that the performance is true.

๐ŸŒป Check model confidence details >>

The core logic of calculating the model confidence interval can be summarized into 2 steps:

  1. Bootstrap samples from the testing data and get the model performance of each sample, collecting them into a list.

  1. With user specified confidence level and the performance list got from step 1, the confidence interval is calculated as Step 2 below:

In the Luigi pipeline, Lady H. only needs to move the core logic for calculating the confidence interval into a helper function, which can then be called by the model evaluation task.

๐ŸŒป Check model evaluation config >>

๐ŸŒป Check model evaluation task >>

๐ŸŒป Check model evaluation_helpers >>

Task - Data Drift Monitoring

Lady H.'s first full-time role as a data scientist was with a financial fraud detection company, where one of her main clients was a giant bank. Every two months, the client sent her new data for fraud analysis.

One December afternoon, the office was nearly empty by 5 p.m., with most of her colleagues either on vacation or having left early. Determined to wrap up the client's fraud report using her trained model, Lady H. planned to finish quickly and head home.

But as she processed the new data, the fraud detection rate came out much lower than usual. "Why is this happening? This looks strangeโ€”I need to dig deeper," she thought. After some investigation, she realized the problem: the client had accidentally sent the wrong dataset, causing the unexpected result.

This is just one of many stories about data drift.

About Data Drift

Data drift isnโ€™t always the result of mistakesโ€”there can be other reasons as well. Regardless of the cause, changes in the data can reduce the predictive power of a trained model. When this happens, data scientists must investigate the underlying causes and may even need to retrain the model.

There are 2 main types of data drift:

  • Concept Drift means, the statistical properties of the forecasting target have changed.

  • Covariate Drift means, the statistical properties of the input features have changed.

Suggested Data Drift Detection Methods

There are many statistical methods to detect data drift, some need to satisfy certain assumptions, some aren't as effective as expected, some python built-in libraries set constraints on the data input. After trying out different methods, Lady H. suggested 2 methods she often uses:

  • To detect concept drift: use PSI (Population Stability Index).

  • To detect covariate drift: use a machine learning model and feature importance.

Let's look into the details.

PSI to Detect Concept Drift

PSI = sum((actual_percentage_i - expected_percentage_i) * ln(actual_percentage_i / expected_percentage_i))

When using PSI to detect concept drift, you need two sets of target data: the "actual" data, which represents the latest target values, and the "expected" data, which corresponds to an older set without any drift. PSI works by binning the numerical target values, where "_i" refers to the i-th bin. The "percentage" in the formula reflects the proportion each bin occupies within the total population.

๐ŸŒป Check PSI python implementation >>

The main goal of PSI is to measure the overall percentage change between two datasets, helping to identify potential drift.

  • PSI < 0.1: no significant population change

  • 0.1 <= PSI < 0.2: moderate population change

  • PSI >= 0.2: significant population change

Using PSI, Lady H. applied it to two target sets without concept drift. The PSI value is below 0.1, indicating that the distributions of the two target sets are similar."

She then applied it to another pair of target sets, where one dataset had drifted from the other. The difference in their distributions and the PSI value both indicate a significant change.

๐ŸŒป Check concept drift detection experiments >>

In Lady H.'s experiments, she focused on detecting concept drift for regression problems. For classification targets, she typically compares the distributions first, as many are binary classification tasks, making distribution comparisons straightforward. For multi-class classification, PSI is also effective since it relies on the concept of binning, allowing the PSI formula to be applied directly without additional binning.

Machine Learning to Detect Covariate Drift

Feature drift can lead to more complex mathematical discussions, and many statistical methods have limited applicability. Lady H. discovered a simple yet effective approach for detecting covariate drift: combining the old and new datasets and labeling them as "old" or "new". A machine learning model is then used to predict these labels. If the model achieves high accuracy, it indicates covariate drift, as the dataset reveals clear differences between the two datasets. To identify the features caused the drift, we can analyze the feature importance.

The code is as simple as training the data with an LGBM model using cross-validation and evaluating the average forecasting performance:

When applying this method on the dataset without covariate drift, the forecasting is showing accuracy near 0.5, similar to random guessing. This means the model can't tell any difference between the new and the old data.

Examining the feature importance from the trained model highlights "Store" as a key feature, suggesting it plays a significant role in distinguishing between the old and new data. This implies that "Store" contributes to the covariate drift.

And if we look at the feature importance from this trained model, it points to feature "Store", indicating that this feature plays an important role in differentiating the old and the new data. So we can assume this feature contributes to the covariate drift.

๐ŸŒป Check covariate drift detection experiments >>

Data Drift Monitoring Pipeline Code

As we saw in run.py, the Task Data Drift Monitoring operates independently of the model pipeline and can be executed at any time. In practice, you can schedule periodic data monitoring jobs, display the results on a dashboard, or trigger alerts whenever unexpected changes occur.

The pipeline code has helpers functions to calculate PSI for concept drifting and use LGBM for covariate drifting. Then in the task class, just need to call these helpers functions.

๐ŸŒป Check data drift monitoring config >>

๐ŸŒป Check data drift monitoring task >>

๐ŸŒป Check data drift monitoring helpers >>

Tests

It's good practice to include both unit and integration tests in the pipeline to ensure that code changes don't break any functionality. Some companies use mockup data for testing, but Lady H. strongly recommends using real client data whenever possible. This makes it easier to debug real-world use cases early and provides more flexibility for scalability testing. Mockup data is useful for covering all edge cases, and creating a dataset that captures all potential scenarios throughout the pipeline is also a good approach.

๐ŸŒป Check unit tests >>

๐ŸŒป Check integration tests >>

Unit tests are used to test single functions, while each integration test can be used to test each Luigi task in this pipeline.

Last updated