October 13, 2021

Demand Forecasting Models In Snowpark

By Charlie Isaksson

Welcome back to our blog series on Snowpark, the latest product from the Snowflake Data Cloud. In this post, we aim to highlight the use of demand forecasting with Snowpark by applying the most popular time series forecasting model (ARIMA), implemented in Java. 

We will then use Snowpark to forecast in Snowflake the future demand from each product’s sales across different stores using the ARIMA model (in fact, any univariate or multivariate time series forecasting model can be used. e.g., Prophet, Exponential Smoothing, Orbit and others that use similar design patterns).

Why Demand Forecasting is So Important

Sales forecasts can play a crucial role in determining demand; an early warning system is an imperative tool for your organization in order to make immediate corrective actions when the targeted sales are not met by the orders. Moreover, we can forecast what can be sold and aid the sales and marketing teams with forward planning and identifying bottlenecks within the process. This case study highlights the significance of demand forecasting using many of these disclosed time series algorithms including ARIMA model. 

What is ARIMA Forecasting?

ARIMA is an acronym for “AutoRegressive Integrated Moving Average,” which combines three statistical models. ARIMA works by measuring events that happen over a period of time. The model is used to understand past data or predict future data in a series. The ARIMA model is a generalization of AutoRegressive Moving Average and adds the concept of integration.

The ARIMA model consists of three statistical-based models:

  • AutoRegression (AR): Refers to a model that shows a changing variable that regresses on its own lagged, or prior, values (i.e., predict future values based on past values).
  • Integrated (I): Represent the differencing of raw observations to allow for the time series to become stationary (i.e., data values are replaced by the difference between the data values and the previous values to eliminate the influences of trend components).
  • Moving Average (MA): Incorporates the dependency between an observation and a residual error from a moving average model applied to legged observations (i.e., smooth out the influence of outliers).

A standard notation used for the Non-Seasonal ARIMA is (p,d,q). The parameters of the ARIMA model are substituted with integers and are defined as follows:

  • p: The number of lag observations included in the model, also called the lag order (e.g., if p = 3 then we will use the three previous periods of our time series). This parameter influences the AutoRegression (AR) model.
  • d: The number of times that the raw observations are differenced, also called the degree of differencing (e.g., series without trend or seasonality). This parameter influences the Integrated (I) model.
  • q: The size of the moving average window. This variable denotes the lag of the error component not explained by trends or seasonality. This parameter influences the Moving Average (MA) model.

The ARIMA model predicts a given time series based on its own past values. It can be used for any nonseasonal series of numbers that exhibits patterns and is not a series of random events.

For example, sales data from a clothing store would be a time series because it was collected over a period of time. One of the key characteristics is that the data is collected over a series of constant, regular intervals.

A modified version of ARIMA that can model predictions over multiple seasons, known as the Seasonal ARIMA (SARIMA). As the name suggests, this model is used when the time series exhibits seasonality. Similar to the ARIMA model but adding a few more parameters to account for the seasons, we write SARIMA as: SARIMA(p,d,q)(P,D,Q)s.

  • P: The number of seasonal autoregressive (SAR) terms.
  • D: The degree of seasonal differences
  • Q: The number of seasonal moving average (SMA) terms
  • s: Refers to the number of periods in each season

(P,D,Q) represents the (p,d,q) for the seasonal part of the time series, except that the pattern appears across multiples of lag (the number of periods in a season). More on these parameters to follow.     

Exploratory Data Analysis 

One challenge retail data faces is making decisions based on the limited historical data. Holidays and select major events come once a year, and so does the chance to see how strategic decisions impacted the bottom line. For the example in this blog post, we use Retail Data Analytics found on Kaggle. The dataset contains historical sales from 45 stores located in different regions, each store contains a number of departments. The key challenge associated with this dataset is to  answer the question: “What is the monthly sales from each store and department?” 

(You can see more information about the features used here.)

Figure 1: Average monthly changes in fuel price from all stores.

The first thing we’ll investigate is how the fuel price substantially increased from December 31, 2010 to May 31, 2011 by a whopping 35% in just three months (see Figure 1).

Figure 2: The all-store average fuel price is steadily increasing in both work days and holidays.

The median fuel price is steadily increasing with higher temperatures except for holidays with temperatures above 80oFwhere the fuel price is higher than workdays. There are also some outliers in holiday with temperature 90 oF(see Figure 2).

Next, we look at the weekly sales over three years. We observe from Figure 3 two peaks, both towards the end of 2010 and 2011. However, in 2012 we don’t see the peaks which might be due to missing observation during the last two months in 2012.    

Figure 3: Weekly sales over a span of three years from all stores.

We can look into Table 1 for the top performing stores in terms of sales. We will select store 20 for further analysis. 

				
					df_top_stores = df.groupby(by=['Store'], as_index=False)['Weekly_Sales'].sum()
df_top_stores.sort_values('Weekly_Sales', ascending=False)[:3]
				
			

Store

Weekly Sales

20

3.013978e+08

4

2.995440e+08

14

2.889999e+08

Table 1: top three performing stores.

Next, we look at the Granger causality test to check for causality amongst these series. The Granger causality test is a statistical hypothesis test for determining where one time series is useful in forecasting another. Therefore, the past values of a time series (X) do not cause the other series (Y). So if the p-value obtained from the test is less than the significance level of 0.05, then we can safely reject the null hypothesis.     

Table 2: Granger causality test.

Looking at the p-values from Table 2, we can see that most of the variables (time series) are interchangeably causing each other, which indicates a multivariate time series and a good candidate for using VAR model forecasting.   

We can also look at the cointegration test that tries to establish the presence of a statistically significant connection between two or more time series. By examining Table 3 and 4, we can deduce that the p-values are mutually low, which indicates a good candidate pair. Low p-values typically stipulate that the pair is cointegrated. A p-value < 0.1 would be a good point to further investigate the properties of these time series. 

Table 3: Cointegration test with clustering.

				
					('Temperature', 'MarkDown2', 0.049614168325526405)
('Temperature', 'Weekly_Sales', 0.045851537096833136)
('Temperature', 'IsHoliday', 0.048208986408574975)
('MarkDown1', 'MarkDown3', 0.045954097233945744)
('MarkDown1', 'IsHoliday', 0.04451391308013215)
('MarkDown2', 'MarkDown4', 3.787262086222212e-21)
('MarkDown2', 'MarkDown5', 1.2178358572363094e-21)
('MarkDown2', 'IsHoliday', 4.462177466189403e-09)
('MarkDown3', 'MarkDown4', 5.099886292604155e-21)
('MarkDown3', 'MarkDown5', 7.262461052490846e-21)
('MarkDown3', 'Weekly_Sales', 5.235258827389919e-21)
('MarkDown3', 'IsHoliday', 1.5999559264629843e-20)
('MarkDown4', 'MarkDown5', 7.408357912668843e-05)
('MarkDown4', 'Weekly_Sales', 0.004207771884579197)
('MarkDown4', 'IsHoliday', 0.003664782539600722)
('MarkDown5', 'Weekly_Sales', 0.02558003341492836)
('MarkDown5', 'IsHoliday', 4.881789257809373e-06)
('Weekly_Sales', 'IsHoliday', 3.0234526175351498e-18)
				
			

Table 4: Cointegration test pairs. The numbers are the p-values which instate critical values < 10%.

Model Training

From the previous section, we can conclude that the dataset is a good candidate for multivariate time series forecasting and that holidays have an influence on the fuel price. Moreover, the weekly sales show seasonality, as a recurring correlation exists in both ACF and PACF (See Figure 5). However, in this post we aim to denote the capabilities of Snowpark and overlook the more advanced features of time series, like adding holidays and temperature as external variables for ARIMA.  

We will resample each time series to monthly to avoid gaps in the time component.

				
					s20 = df_top_stores['Weekly_Sales'].resample('M').sum()    

				
			

In order to apply ARIMA, the data needs to be stationary. That means the time series shouldn’t change with respect to time. Most statistical models assume or require the time series to be stationary. We may be able to visualize the data to identify a changing mean or variation (the measurement of the spread between numbers in a dataset). However, in reality it’s difficult to conclude that the time series is stationary from a simple visual inspection. A more reliable method is the Dickey-Fuller test. In case the test includes a “Test Statistic” greater than the “Critical Value” (typically 5%), then the time series is deemed stationary. Thus, the smaller the p-value, the more likely that it’s stationary. 

Figure 4 and Table 5 shows that the time series is considered stationary. That is, the p-value is smaller than 1% critical value (CV).   

Figure 4: Rolling mean and standard deviation from 2010 to 2013 for store 20.

				
					<Results of Dickey-Fuller Test>
Test Statistic                 -5.361429
p-value                         0.000004
#Lags Used                      1.000000
Number of Observations Used    34.000000
Critical Value (1%)            -3.639224
Critical Value (5%)            -2.951230
Critical Value (10%)           -2.614447
dtype: float64
				
			

Table 5: The Dickey-Fuller test statistic for the retail dataset for store 20.

Now that we know our time series is stationary, we can attempt to find the appropriate SARIMA parameters.

In our use case it is clear that s = 12, but how do we set the other parameters?

There are numerous best practices to consider. Fortunately, there is an Auto-ARIMA model that automatically sets the parameters. However, at the time of writing this post, there isn’t an implementation for Scala or Java. Since we are going to use a Java-based SARIMA model that requires us to manually set the parameters, we are going to explain in more depth the selection strategy that you can use to estimate your ARIMA model parameters.

There are two statistical functions regularly used to set the SARIMA parameters, specifically:

  • AutoCorrelation Function (ACF)
  • Partial AutoCorrelation Function (PACF)

Non-Seasonal Component of the ARIMA Model

To estimate the amount of AR terms (the p term for ARIMA), we need to look at the PACF plot from Figure 5. First, ignore the value at lag 0. It will always show a perfect correlation, since we are estimating the correlation between today’s value with itself.

Note that there is a blue area in the plot, representing the confidence interval.

To estimate how many AR terms you should use, start counting how many “vertical lines” are above or below the confidence interval before the next one enter the blue area.

Looking at the PACF plot above, we can estimate to use 7 AR terms for our model, since lag 7 is out of the confidence interval, and lag 8 is in the blue area.

Figure 5: Monthly sales indicate a recurring correlation exists in both ACF and PACF.

Next, estimating I terms (the d term for ARIMA). That one is easy. All we need to do is to know how much differencing was used to make the series stationary. Looking at Table 5, we determined that our time series is stationary by performing the Dickey-Fuller test. Thus, we set (d = 0). However, in case we use log difference or first difference to transform a time series, the amount of I terms will be set to 1.

To estimate the amount of MA terms (the q term for ARIMA), we need to look at the ACF plot. The same logic is applied here: how many “vertical lines” are above or below the confidence interval before the next “vertical line” enters the blue area?

In our example, we can estimate 0 MA terms, since we do not have any lag out of the confidence interval.

Seasonal Component of the ARIMA Model

Finally, we can estimate the seasonal AR terms (the P term for SARIMA). The process is quite similar to non-seasonal AR, and we will still use the ACF and PACF function for that.

To estimate the amount of seasonal AR terms, we will look at the PACF function. Instead of counting how many “vertical lines” are out of the confidence interval, we will count how many seasonal “vertical lines” are out.

In our example, as the data was collected on a monthly basis and we have a yearly seasonality, then we need to check if the “vertical lines” at lags 12, 24, 48, etc. are out of the confidence interval area. In case of a positive result, we need to add 1 term for Seasonal AR. In Figure 5 we can see that we don’t have any “vertical lines” outside the confidence interval area, so we will add 0 terms for seasons AR (SAR).

For estimating Seasonal I terms (the D term for SARIMA), we follow the same logic of estimating non-seasonal differencing. As we did not use seasonal differencing, we do not add 1 for seasonal differencing.

The seasonal moving average (the Q term for SARIMA), we will be looking at the ACF plot in Figure 5 using the same logic of estimating SAR terms. For our example, we don’t see any significant correlation at lags 12, 24, 48, etc. So, we will add 0 terms for SMA.

Final considerations: at the end of this process we will have the starting terms needed to build our SARIMA (7, 0, 0) x (0, 0, 0) 12 model. In reality, we may need to further refine these terms.

Build a Simple SARIMA Model

Let’s split the dataset into training and testing sets for back testing. We will be forecasting the next seven months:

				
					train = s20.iloc[:-7]
test = s20.iloc[-7:]
				
			

We build a SARIMA with the parameters we identified earlier.

				
					sarima_xt = sm.tsa.statespace.SARIMAX(train['Weekly_Sales'],
                                      trend='n',
                                      order=(7, 0, 0),
                                      seasonal_order=(0, 0, 0, 12))
sarima_xt = sarima_xt.fit()
print(sarima_xt.summary())
				
			

Below is the SARIMA training summary. From this, we can see that the AIC score is low, which means the estimated terms are good first steps. The lower the AIC score is, the better the SARIMA performance. 

				
					                             SARIMAX Results                                
==============================================================================
Dep. Variable:           Weekly_Sales   No. Observations:                   29
Model:               SARIMAX(7, 0, 0)   Log Likelihood                  -0.649
Date:                Mon, 26 Jul 2021   AIC                             17.297
Time:                        16:41:01   BIC                             28.236
Sample:                    01-31-2010   HQIC                            20.723
                         - 05-31-2012                                         
Covariance Type:                  opg                                         
==============================================================================
                 coef    std err          z      P>|z|      [0.025      0.975]
------------------------------------------------------------------------------
ar.L1         -0.2348      0.250     -0.938      0.348      -0.725       0.256
ar.L2          0.2125      0.281      0.756      0.450      -0.338       0.764
ar.L3          0.4728      0.239      1.975      0.048       0.004       0.942
ar.L4          0.2246      0.222      1.010      0.313      -0.211       0.660
ar.L5          0.2173      0.260      0.837      0.402      -0.291       0.726
ar.L6         -0.2693      0.350     -0.770      0.442      -0.955       0.417
ar.L7          0.3767      0.320      1.175      0.240      -0.251       1.005
sigma2         0.0340      0.010      3.463      0.001       0.015       0.053
===================================================================================
Ljung-Box (L1) (Q):                   0.07   Jarque-Bera (JB):                 9.60
Prob(Q):                              0.79   Prob(JB):                         0.01
Heteroskedasticity (H):               5.93   Skew:                            -0.93
Prob(H) (two-sided):                  0.01   Kurtosis:                         5.11
===================================================================================


				
			

We then call the prediction function to obtain the seven month forecast from our trained SARIMA model.

				
					pred = sarima_xt.get_prediction(start=test.index.min(),
                                    end=log_df.index.max(), 
                                     dynamic=True, 
                                     full_results=True)
pred_ci = pred.conf_int(alpha=0.20)
fc_series = pred.predicted_mean
				
			

Lorem ipsum dolor sit amet, consectetur adipiscing elit. Ut elit tellus, luctus nec ullamcorper mattis, pulvinar dapibus leo.

				
					train = s20.iloc[:-7]
test = s20.iloc[-7:]
				
			

Figure 6: the forecasted seven months with the SARIMA model.

Demand Forecasting on Snowflake with Snowpark

Previously we demonstrated SARIMA forecasting performance with one store. However, that wasn’t the use case we were asked to do. We were supposed to forecast each store and department. At this point we may scratch our heads, thinking we have 45 stores and 81 departments (or 3645 unique time series).

In reality, we will be facing tens of thousands or even millions of time series. Not only do we have to deal with the orders of magnitude of time series, but also the complexity of coding and maintaining concurrent applications. As new data comes in, the previous model has to be discarded and a new model has to be retrained with the new data.

This is where Snowflake can step in and shed some light. Snowpark can solve this dilemma in a very elegant and simple way. With just a few lines of code, we can process millions of time series while still being able to update the database.

In this section we are going to implement a SARIMA model in a Scala application and call the Snowflake database via the Snowpark framework to forecast all the stores’ and departments’ weekly sales. In hindsight, we can’t automatically set SARIMA parameters (p, d, q) x (P, D, Q)s without implementing a function that estimates the optimal parameters.

The remainder of this post will focus on showcasing the forecasting capabilities. Therefore, we will freeze the parameters and execute under the same conditions for all the time series. That may impact our forecasting performance.

Preparing your Snowpark Environment

To integrate the Snowpark library into your Maven project, follow these two blog posts that go into great detail installation and setup your project:

Complete Installation Guide of Snowpark on Linux

How to Install Snowpark on IntelliJ IDEA

You may also need to include the plugins for compiling Scala files (net.alchim31.maven) and for including all dependencies into your jar file (maven-shade-plugin) from your project pom.xml file.  

Creating a SARIMA Model

The first step to applying your SARIMA forecasting model is to establish a session with the Snowflake database. Before executing the sample code in Scala, replace all the <placeholders> with your Snowflake connection information (more information on the Creating a Session process can be found here or for secure authentication read here).

				
					import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    // Replace the <placeholders> below.
    val configs = Map (
      "URL" -> "https://<account>.snowflakecomputing.com:443",
      "USER" -> "<user name>",
      "PASSWORD" -> "<password to the database>",
      "ROLE" -> "<role name>",
      "WAREHOUSE" -> "<warehouse name>",
      "DB" -> "<database name>",
      "SCHEMA" -> "<schema name>"
    )
    val session = Session.builder.configs(configs).create
  }}
				
			

The next step is to create a database and upload your Retail dataset to your database via the web UI (a detailed step-by-step tutorial on creating a database and schema and uploading data can be found here). The following is the structure/schema for the Retail records:   

				
					val sale_schema = StructType(
 StructField("STORE_ID", StringType, nullable = true) ::
   StructField("DEPT_ID", IntegerType, nullable = true) ::
   StructField("DATE_SALE_ARRAY", StringType, nullable = true) ::
   Nil
)

val saleDf = session.read.schema(sale_schema).table("store_dept_salesarray")
				
			

We can do a data sanity check by executing the below SQL query, which should show the output presented in Figure 7. 

				
					SELECT * FROM  "<database name>"."<schema name>".SALES;
				
			

Figure 7: Show a sample from the retail dataset uploaded to the Snowflake database.  

The next code snippet is the magic bullet  in terms of enabling forecasting of a massive amount of time series. With just a few lines of code, we can regroup the dataset based on store and department. Each row in the database represents a time series. Looking closely at Figure 8, where column “DATE_SALE_ARRAY” contains the full time series in a pair of timestamps ti and the sales values Si: I.e., <<ti, Si>, <ti+1, Si+1>, <ti+2, Si+2>, …..<ti+n, Si+n>> where i, 1, 2, 3, …, n.

The “array_agg” component in the code below creates an array with all the date and sales pairs separated by a “#” tag (can use any separator) and “array_to_string” takes the entire array and joins it with a “_” tag.

Wondering why?  Snowpark User Defined Function (UDF) does not support “array of arrays.” Keeping time series as strings allows them to be stored efficiently, especially since time series tend to be relatively small.

				
					CREATE TABLE STORE_DEPT_SALES_ARRAY AS
SELECT s.store_id, s.dept_id, array_to_string(array_agg(s.date_sales), '_') as date_sale_array FROM 
(SELECT store_id, dept_id, concat(trim(date_ts,'"'),'#',to_varchar(sales_amt)) as date_sales FROM "<database name>"."<schema name>".SALES) s
GROUP BY s.store_id, s.dept_id;
				
			

Run the SQL query below and you should obtain the output in Figure 8.

				
					SELECT * FROM "<database name>"."<schema name>".STORE_DEPT_SALES_ARRAY;
				
			

Figure 8: Shows a sample from the retail dataset in Snowflake database where each row represents a time series.  

Next in line is the heart of the engine. We use the UDF function that processes each time series and call the SARIMA model “Arima.forecast_arima” for forecasting. In this example we save the forecast, test series, and upper and lower bounds based on the 95% confidence interval and the Root Mean-Square Error (RMSE) validation metric.

				
					val salesForecastUDF = udf((item: String) => {
 val data: Array[String] = item.split("_")
 val data_x: Array[Array[String]] = data.map(x => x.split('#'))
 val sales: Array[Double] = data_x.map(x => x(1).toDouble)

 val train: Array[Double] = sales.reverse.takeRight(sales.length - forecastSize).reverse
 val test: Array[Double] = sales.takeRight(forecastSize)

 try {
   val params: ArimaParams = new ArimaParams(p, d, q, P, D, Q, m)
   HannanRissanen.estimateARMA(train, params, forecastSize, 1000)

   val forecastResult = Arima.forecast_arima(train, forecastSize, params)

   // Read forecast values
   val forecastData = forecastResult.getForecast

   // You can obtain upper- and lower-bounds of confidence intervals on forecast values.
   // By default, it computes at 95%-confidence level. This value can be adjusted in ForecastUtil.java
   val uppers: Array[String] = forecastResult.getForecastUpperConf.map(_.toString)
   val lowers: Array[String] = forecastResult.getForecastLowerConf.map(_.toString)

   // You can also obtain the root mean-square error as a validation metric.
   val rmse: Double = forecastResult.getRMSE
   Array(forecastData.map(x => f"$x%1.2f").mkString("_"),
     test.map(x => f"$x%1.2f").mkString("_"),
     uppers.mkString("_"),
     lowers.mkString("_"),
     rmse.toString).mkString("|")
 } catch {
   case re: RuntimeException => {
     println(re)
     "NO_FORECAST"
   }
 }
})
				
			

Below calls the UDF function with the new column named “forecast”. See the output from Figure 9.

				
					val output_forecast = saleDf.withColumn("forecast", salesForecastUDF(col("DATE_SALE_ARRAY")))
				
			

We can save the Snowpark DataFrame with the below code. The DataFrame will be saved in the Snowflake database with the table name “ARIMA_SALE_FORECAST”.

				
					output_forecast.write.mode(SaveMode.Overwrite).saveAsTable("ARIMA_SALE_FORECAST")


				
			

Execute the below SQL query to show a sample from our forecasted sales across each store and department.

				
					SELECT * FROM "<database name>"."<schema name>".ARIMA_SALE_FORECAST;
				
			

Figure 9: Shows a sample from the retail dataset in Snowflake database with a new column “FORECAST” that contains forecasts from the column “DATE_SALE_ARRAY” next to it.

Analyze the Overall Performance From All Time Series

We can’t conclude this example  without showing how well we did. For this we turn to Python’s IDE Jupyter notebook. We use the same authentication credentials we pointed out earlier, just replace all the <placeholders> with your Snowflake connection information and use the Snowflake connector library. (More on this here.)

				
					import os
import sys
import pandas as pd

import snowflake.connector as sc
import snowflake.connector.pandas_tools as pt

ctx = sc.connect(
    user='<user name>',
    password='<password to the database>',
    account='<account>',
    warehouse='<warehouse name>',
    database='<database name>',
    schema='<schema name>',
    protocol='https',
    port=443
)
				
			

Snowflake provides the executed query as Pandas DataFrame. The code below shows the data types and the number features. Here, the Python object data type refers to string type.

				
					cur = ctx.cursor()
query = cur.execute(f"SELECT * FROM <database name>.<schema name>.ARIMA_SALE_FORECAST")

sf_forecast_df = query.fetch_pandas_all()



sf_forecast_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2308 entries, 0 to 2307
Data columns (total 4 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   STORE_ID         2308 non-null   object
 1   DEPT_ID          2308 non-null   int8  
 2   DATE_SALE_ARRAY  2308 non-null   object
 3   FORECAST         2308 non-null   object
dtypes: int8(1), object(3)
memory usage: 56.5+ KB
				
			

The code snippet below extracts the RMSE from the column “FORECAST” created earlier. See the output from Figure 10.

				
					sf_forecast_df = sf_forecast_df[sf_forecast_df.FORECAST != 'NO_FORECAST']
def get_score(x):
    score = [v.strip() for v in  x['FORECAST'].split('|')]
    return np.float16(score[4])

sf_forecast_df['RMSE'] = sf_forecast_df.apply(lambda x: get_score(x), axis=1)
				
			

Figure 10:  Shows a sample from the retail dataset where we add a new column extracted from the “FORECAST” column.

Figure 11 shows the mean monthly difference of sales forecast versus actual over a six month period. More than 86% of it is below one Root Mean-Square Error (RMSE), demonstrated on the x-axis, and 0.2% is below two RMSE. We have a negligible percentage sales time series that is lower than four RMSE.  

				
					plotting_hist(sf_forecast_df['RMSE'])
				
			

Figure 11: Shows percentage distribution for all sales mean difference between the forecasted versus actual over a six month period.

The code snippet below, processes each time series and converts its data types from string back to forecasted, test series, upper and lower confidence interval, and RMSE validation metric. We will select one Store and Department to visually inspect. Figure 12 shows the output from the below code.  

				
					def get_ts(x):
    ts_forecast = [v.split('_') for v in x['FORECAST'].values[0].split('|')]
    
    forecast = list(map(np.float32, ts_forecast[0]))
    test = list(map(np.float32, ts_forecast[1]))
    uppers = list(map(np.float32, ts_forecast[2]))
    lowers = list(map(np.float32, ts_forecast[3]))
    
    
    ts_org = [v.split('#') for v in  [v for v in x['DATE_SALE_ARRAY'].values[0].split('_')]]
    dates, sales = map(list, zip(*ts_org[-len(forecast):]))
    
    try:
        tmp_df = pd.DataFrame(index=pd.DatetimeIndex(dates, freq='M'))
        tmp_df['test'] = np.exp(test)
        tmp_df['forecast'] = np.exp(forecast) 
        tmp_df['uppers'] = np.exp(uppers)
        tmp_df['lowers'] = np.exp(lowers)
        tmp_df['RMSE'] = x['RMSE'].values[0]
        return tmp_df
    except ValueError:
        return None 

forecast_df = sf_forecast_df.groupby(['STORE_ID', 'DEPT_ID']).apply(lambda x: get_ts(x))
				
			

PRO TIP: In actuality, we will not retrieve all the time series hosted in Snowflake. We will also partition the database i.e., store, departments, year, month, day, etc. in order to efficiently retrieve the data.

Figure 12: The output from the processed forecast.

Finally, we randomly select one time series from the lowest RMSE and visualize the forecast over the six months. We also have the training time series from the “DATE_SALE_ARRAY” column that we extract the Timestamp components and build back to a time series from the string type. Figure 13 shows the forecast between predicted versus actual from sales sampled from the groups in Figure 11 with the smallest RMSE. We selected Store ID 28 and Department ID 94. 

				
					forecast_df.reset_index(inplace=True)

stor28_dept94 = forecast_df[forecast_df.STORE_ID == '28']
stor28_dept94 = stor28_dept94[stor28_dept94.DEPT_ID == 94]

act_ts = sf_forecast_df[sf_forecast_df.STORE_ID == '28']
act_ts = act_ts[act_ts.DEPT_ID == 94]['DATE_SALE_ARRAY']

t = [v.split('#') for v in act_ts.to_frame()['DATE_SALE_ARRAY'].values[0].split('_')]
dates, sales = map(list, zip(*t[:-6]))

tmp_df = pd.DataFrame(index=pd.DatetimeIndex(dates, freq='M'))
tmp_df['train'] = np.exp(np.float32(sales))

stor28_dept94.set_index('level_2', inplace=True)


# Plotting the time series

fig, ax1 = plt.subplots(figsize=(12, 7), dpi=75)
ax1.plot(tmp_df, label='training')
ax1.plot(stor28_dept94['test'], label='actual')
ax1.plot(stor28_dept94['forecast'], label='SARIMA forecast')

ax1.fill_between(stor28_dept94.index,
                stor28_dept94['uppers'],
                stor28_dept94['lowers'], color='k', alpha=.15)

ax1.set_xlabel('Date')
ax1.set_ylabel('Sales')

ax1.set_title('Forecast vs Actuals')
plt.xticks(rotation = 45)

fig.legend(loc='upper left', fontsize=8)
				
			

Figure 13: Shows Sales over a six month period for Store 28 and Department 94, where RMSQ < 0.02.

Conclusion

This post should give you a better understanding of how a Snowpark UDF can be used to perform complex tasks; particularly, the demand forecasting of sales data that is limited in terms of historical data.  We utilized the most popular time series forecasting algorithm ARIMA and revealed the inner workings of selecting its parameters.

Final Remarks:

  • Always check if your time series are stationary (if it has a trend or seasonality that is constantly increasing or decreasing).
  • In case your time series is not stationary we can perform differencing. (Always check if your time series is stationary after differencing and record the number of times you did the difference).
  • Use a validation sample to back test your performance.
  • Select AR and MA terms by using the ACF and PACF functions.
  • Construct a pair of timestamp and value to represent a time series by utilizing the Snowflake feature array_agg function.
  • SARIMA (or any other forecasting algorithm) model is created by using Snowpark UDF function to distributedly process a massive amount of time series.
  • Always validate model performance by comparing the predicted values to the actuals from the holdout data sample.

One critical question: what happens after you deploy your model? What are the best practices in model monitoring and degradation, and how do you detect missing data or outliers in your input dataset? We recommend reading this masterpiece blog post: The Ultimate MLOps Guide that explains model deployment in production. 

If your team is interested in learning more about ML models running in Snowflake, feel free to reach out to the phData ML team. We’re here to help! 

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit