June 21, 2021

Executing Machine Learning Models In Snowpark

By Charlie Isaksson

Welcome back to our blog series on Snowpark, the latest product from the Snowflake data cloud. In this post, we aim to highlight the use of machine learning with Snowpark by applying the XGBoost algorithm to a dataset using scikit-learn (or sklearn) in Python and export the model to an open format called PMML, the Predictive Model Markup Language. 

We will then use Snowpark to classify data in Snowflake to predict whether the passengers on board the Titanic survived the voyage using the XGBoost model.

What is XGBoost Classification?

XGBoost is a supervised learning algorithm and is one of the most popular ML algorithms due to its tendency to yield highly accurate results. XGBoost is based on the decision-tree ensemble machine learning algorithm that uses a gradient boosting framework. Essentially, XGBoost uses many trees: one tree to predict the target, another one to predict some kind of residuals (the residual is the error, calculated as the difference between the observed value and the mean value that the model predicts for that observation) of the first tree, the third tree to predict the residuals of the second tree and so on.

It combines the estimates of simpler, weaker models (trees) to achieve high accuracy, a common technique in all gradient boosting models. However, XGBoost employs one powerful technique known as regularization that helps the model to generalize (ability to adapt properly to new, previously unseen data).   

Sample Dataset

The dataset used in this example is from the legendary Titanic shipwreck that occurred on April 15, 1912. The ship sank after colliding with an iceberg, killing 1502 passengers and crew (more on the dataset here). The XGBoost model tries to answer the question: “what sorts of people were more likely to survive?” You can see more information about the features used here.   

Model Training

To build the XGBoost model for the Titanic dataset, first, we need to include some python libraries:

				
					import pandas as pd
import matplotlib.pyplot as plt
from xgboost import XGBClassifier
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import accuracy_score
from sklearn.metrics import average_precision_score
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_curve, auc, recall_score, precision_score
				
			

Next, we read the Titanic dataset as a Pandas DataFrame. This dataset will be used to predict the survival of passengers on the Titanic. To train and test the XGBoost model, the dataset needs to be split into a training dataset and a test dataset. 70 percent of the data is used to train the model, and 30 percent will be used for validation.

				
					titanic = pd.read_csv("titanic_train.csv")

target = 'Survived'
y = titanic.pop(target)
features = titanic.columns

X_train, X_test, y_train, y_test = 
train_test_split(titanic, y, test_size=0.30, random_state=42)
				
			

Building the XGBoost Model

The workflow of building a scikit-learn XGBoost model is by creating a pipeline object and populating it with any pre-processing steps and the model object. In addition, the model defines parameters, before calling the pipe.fit(X_train, y_train) method to train the model.  

				
					pipeline_obj = Pipeline([
            ("imp", SimpleImputer(strategy="median")),
            ("gbc", XGBClassifier(use_label_encoder=False))
        ])
pipeline_obj.fit(X_train, y_train)
				
			

Evaluating the XGBoost Model

We can call the predict method on the test dataset with the trained model.

				
					predictions = pipeline_obj.predict(X_test)

# The output:
array([0, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1,
       0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1,
       0, 0, 0, 1, 1, 1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 0,
       1, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0,
       0, 1, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1,
       0, 0, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0, 0,
       0, 1, 1, 0, 0, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0,
       1, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 1, 1, 1, 0,
       0, 1, 0, 1, 0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1, 1, 0,
       0, 0, 0, 0])
				
			

The receiver operating characteristic curve (ROC curve) visualizes the effect of a chosen probability threshold on the classification efficiency. The larger the area under the curve, the higher the efficiency of binary classification. In our example, the model achieves a 77% precision-recall score.  

A graph titled, "ROC Curve"

Exporting the XGBoost Model

Finally, we convert the model to a PMML file. Why use PMML? PMML enables researchers to quickly deploy models with different programming languages such as R, Python, c++, etc. in a real-time environment with a more robust stack like Java. Another popular library is Nyoke.

Nyoke is a Python library with comprehensive support of the latest PMML 4.4 standard. There are multiple libraries used for converting scikit-learn models to PMML, However, Nyoka has great support for other types of models, like Keras deep learning models and ARIMA forecasting models. We can simply call the xgboost_to_pmml method to save the PMML model with the file named XGB_titanic.pmml. 

				
					from nyoka import xgboost_to_pmml


f_name = "XGB_titanic.pmml"
xgboost_to_pmml(pipeline_obj, features, target, f_name)
				
			

Machine Learning Classification on Snowflake with Snowpark

The first step to applying your XGBoost PMML model is to establish a session with the Snowflake database. Before executing the sample code, replace all the <placeholders> with your Snowflake connection information (more information on the Creating a Session process can be found here).

				
					import com.snowflake.snowpark._
import com.snowflake.snowpark.functions._

object Main {
  def main(args: Array[String]): Unit = {
    // Replace the <placeholders> below.
    val configs = Map (
      "URL" -> "https://<account>.snowflakecomputing.com:443",
      "USER" -> "<user name>",
      "PASSWORD" -> "<password to the database>",
      "ROLE" -> "<role name>",
      "WAREHOUSE" -> "<warehouse name>",
      "DB" -> "<database name>",
      "SCHEMA" -> "<schema name>"
    )
    val session = Session.builder.configs(configs).create
  }}
				
			

The next step is to create a database and upload your Titanic dataset to your database via the web UI (a detailed step-by-step tutorial on creating a database and schema and uploading data can be found here). The following is the structure/schema for the Titanic passengers record:   

				
					val schema = StructType(
 StructField("PCLASS", LongType, nullable = true) ::
   StructField("AGE", LongType, nullable = true) ::
   StructField("SIBSP", LongType, nullable = true) ::
   StructField("PARCH", LongType, nullable = true) ::
   StructField("FARE", DoubleType, nullable = true) ::
   StructField("EMBARKED_C", LongType, nullable = true) ::
   StructField("EMBARKED_Q", LongType, nullable = true) ::
   StructField("EMBARKED_S", LongType, nullable = true) ::
   StructField("SEX_FEMALE", LongType, nullable = true) ::
   StructField("SEX_MALE", LongType, nullable = true) ::
   Nil
)

val titanic_Df = session.read.schema(schema).table("TITANIC_TABLE")
				
			

And this is the sample output from the Titanic table.

				
					---------------------------------------------------------------------------------------------------------------------------
|"PCLASS"  |"AGE"  |"SIBSP"  |"PARCH"  |"FARE"   |"EMBARKED_C"  |"EMBARKED_Q"  |"EMBARKED_S"  |"SEX_FEMALE"  |"SEX_MALE"  |
---------------------------------------------------------------------------------------------------------------------------
|3         |22     |1        |0        |7.25     |0             |0             |1             |0             |1           |
|1         |38     |1        |0        |71.2833  |1             |0             |0             |1             |0           |
|3         |26     |0        |0        |7.925    |0             |0             |1             |1             |0           |
|1         |35     |1        |0        |53.1     |0             |0             |1             |1             |0           |
|3         |35     |0        |0        |8.05     |0             |0             |1             |0             |1           |
|3         |0      |0        |0        |8.4583   |0             |1             |0             |0             |1           |
|1         |54     |0        |0        |51.8625  |0             |0             |1             |0             |1           |
|3         |2      |3        |1        |21.075   |0             |0             |1             |0             |1           |
|3         |27     |0        |2        |11.1333  |0             |0             |1             |1             |0           |
|2         |14     |1        |0        |30.0708  |1             |0             |0             |1             |0           |
---------------------------------------------------------------------------------------------------------------------------
				
			

Once we have the dataset uploaded into the database, we can start by creating a class to load the saved PMML model. The below sample code takes as input the PMML file and returns the model.    

				
					import org.pmml4s.model.Model

class XGBPMML(modelPath: String) {
 def getModel(): Model = {
   val xgb: Model = Model.fromFile( s"$modelPath/lib/pmml/XGB_titanic.pmml")

   xgb
 }
}
				
			

Next, we need to add the pmml4s_2.12-0.9.11.jar and spray-json_2.12-1.3.5.jar files as dependencies for Snowpark so that they will get uploaded to Snowflake when the model runs. The code sample below shows how Snowpark handles the dependency injections.

				
					val libPath = new java.io.File("").getAbsolutePath
println(libPath)
ession.addDependency(s"$libPath/lib/pmml4s_2.12-0.9.11.jar")
session.addDependency(s"$libPath/lib/spray-json_2.12-1.3.5.jar")
				
			

Finally, the XGBoost model can be used to classify the new records. The following example creates a Snowpark user-defined function (or UDF) named transformationUDF, which calls the predict PMML model method and returns the prediction as a new column called Survived from the Snowpark DataFrame.   

				
					val model = new XGBPMML(libPath).getModel()

val transformationUDF = udf((pclass: Long, age: Long, sibsp: Long,
                            parch: Long, fare: Double, embarked_c: Long,
                            embarked_q: Long, embarked_s: Long, sex_female:
                            Long, sex_male: Long) => {
 val v = Array[Any](pclass, age, sibsp, parch, fare, embarked_c, embarked_q, embarked_s, sex_female, sex_male)
 model.predict(v).last.asInstanceOf[Long]
})
titanic_Df.withColumn("Survived", transformationUDF(col("PCLASS"), col("AGE"),
                                                col("SIBSP"), col("PARCH"),
                                                col("FARE"), col("EMBARKED_C"),
                                                col("EMBARKED_Q"), col("EMBARKED_S"),
                                                col("SEX_FEMALE"),col("SEX_MALE")))
titanic_Df.show()

				
			

Below is the sample output from the Snowpark DataFrame. It shows the column with the predicted label.

				
					----------------------------------------------------------------------------------------------------------------------------------------
|"PCLASS"  |"AGE"  |"SIBSP"  |"PARCH"  |"FARE"   |"EMBARKED_C"  |"EMBARKED_Q"  |"EMBARKED_S"  |"SEX_FEMALE"  |"SEX_MALE"  |"SURVIVED"  |
----------------------------------------------------------------------------------------------------------------------------------------
|3         |22     |1        |0        |7.25     |0             |0             |1             |0             |1           |0           |
|1         |38     |1        |0        |71.2833  |1             |0             |0             |1             |0           |1           |
|3         |26     |0        |0        |7.925    |0             |0             |1             |1             |0           |1           |
|1         |35     |1        |0        |53.1     |0             |0             |1             |1             |0           |1           |
|3         |35     |0        |0        |8.05     |0             |0             |1             |0             |1           |0           |
|3         |0      |0        |0        |8.4583   |0             |1             |0             |0             |1           |0           |
|1         |54     |0        |0        |51.8625  |0             |0             |1             |0             |1           |0           |
|3         |2      |3        |1        |21.075   |0             |0             |1             |0             |1           |0           |
|3         |27     |0        |2        |11.1333  |0             |0             |1             |1             |0           |1           |
|2         |14     |1        |0        |30.0708  |1             |0             |0             |1             |0           |1           |
----------------------------------------------------------------------------------------------------------------------------------------

				
			

Conclusion

Hopefully, after reading this post, you’ll have a better understanding of how a Snowpark UDF can be used to perform complex tasks, in particular, the prediction (inference) of passenger survival that is learned by the XGBoost ML algorithm.  

If your team is interested in learning more about ML models running in Snowflake, feel free to reach out to the phData ML team, we love helping out!

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit