January 30, 2024

Snowpark ML: How to Do Document Classification on Snowflake

By Vinicius Olivera

This blog was originally written by Travis Hegner and updated for 2024 by Vinicius Olivera.

Snowpark ML is transforming the way that organizations implement AI solutions. Snowpark allows ML models and code to run on Snowflake warehouses. By “bringing the code to the data,” we’ve seen ML applications run anywhere from 4-100x faster than other architectures. 

Vector embeddings are a popular technique for working with unstructured data for Generative AI use cases. We won’t go fully down the road of Large Language Models in this post, but we will show:

  • How documents can be embedded as vectors
  • How to visualize those vectors
  • How to apply ML classification using those embedding vectors as features for documents

Join us on this technical walkthrough as we determine the practicality of the Snowflake Data Cloud and Snowpark and Snowflake ML for machine learning use-cases.

Document Vectors

With the success of word embeddings, it’s understood that entire documents can be represented in a similar way. In this case study, we will build a vector that represents a document that is derived from an IDF weighted average of the word embeddings that make up the document.

This form of unsupervised machine learning should put documents that use many similar words near each other in the resulting vector space, which should allow us to try some interesting classification and visualization tasks.

Preparing the Data

We will use a BBC news articles dataset found on Kaggle. This dataset consists of 2,225 news articles in five different categories: business, entertainment, politics, sports, and technology.

We will use several combinations of tools to create a vector that attempts to represent each article in euclidean space. We will create more Snowflake tables to act as parts of a feature store, explore those features for value, and try our hand at a couple of different models to classify a hold-out set.

The companion notebook has scripts to support the loading of this data on Snowflake and was used as the entry point to the Snowflake instance.

A sample set of data from Kaggle.

The class column is either “test” or “train,” and represents to which partition the article belongs. The ID column is the unique ID of the article within a category, and the category column is the category to which the article belongs. Take note that only the category/ID combination is unique in this dataset, not the ID alone.

Snowflake Connection

First of all, in order to interact with Snowflake remotely, we’re going to need to:

  • Install Snowflake and Snowpark connector packages.
  • Provide Snowpark Library clients with pertinent information for authentication.

Anaconda

Snowflake’s Anaconda channel is the primary mechanism for installing packages in Snowflake.  Installing Anaconda locally is a good way to match your local environment to Snowflake’s when using Snowpark Python’s API. Even though we’re working remotely, we must have dependency parity between local and remote installation.

Here is a guide to help navigate the installation process using Anaconda.

Connections Setup

For this article, we’re going to use and explore two connection methods with Snowflake: The Session object and the connection object.

				
					import os

from dotenv import load_dotenv
from pathlib import Path

dotenv_path = Path('vars.env')
load_dotenv(dotenv_path=dotenv_path)

USER = os.getenv("SF_USER")
PASSWORD = os.getenv("SF_PASSWORD")
ACCOUNT = os.getenv("SF_ACCOUNT")
WAREHOUSE = os.getenv("SF_WAREHOUSE")
DATABASE = os.getenv("SF_DATABASE")
SCHEMA = os.getenv("SF_DATABASE_SCHEMA")

				
			

connection object:

				
					import snowflake.connector

conn = snowflake.connector.connect(
    user=USER,
    password=PASSWORD,
    account=ACCOUNT,
    warehouse=WAREHOUSE,
    database=DATABASE,
    schema=SCHEMA
    )
				
			

session object:

				
					import snowflake.connector
from snowflake.snowpark import Session

connection_parameters = {
    "account": ACCOUNT,
    "user": USER,
    "password": PASSWORD,
    "warehouse": WAREHOUSE,
    "database": DATABASE,
    "schema": SCHEMA
}

session = Session.builder.configs(connection_parameters).create()

				
			

Training the Word Vectors

Extracting Tokens

First, we create a UDF that takes in a string and breaks it into individual words, excluding punctuation. A UDF is a function that operates on a single row in a Snowflake table to produce a prediction. Snowpark DataFrames, on the other hand, allows us to apply operations (including UDFs) across all of the rows in a table.

				
					## Exploring with the anonimous interface
# Define the UDF (User-Defined Function)
extract_words = udf(lambda article: 
                    [
                        x.strip() for x in re.compile(r'\b\w+\b', 
                        flags=re.IGNORECASE).findall(article.lower())
                    ],
                    return_type=ArrayType(element_type=str),
                    input_types=[StringType()],
                    name="my_udf",
                    replace=True,
                    session=session
)

				
			

This UDF can be applied to a text column in Snowflake, and it returns an array of strings that match any sequence of word characters as defined in Scala’s regular expression engine. We will apply this UDF to each article and write the resultant tokens to a local file to feed a gensim Word2Vec model in Python later.

				
					df = session.table("BBC_ARTICLES").filter(col("CLASS") == "train")

words = df.select(extract_words(col("WORDS")).alias("ARR")).collect()

del df

contents = []

for word in words:
    contents.append(
        word[0].replace(
                "[\n  ", ""
            ).replace(
                "\n  ", ""
            ).replace(
                '"', ""
            ).replace(
                "\n]", ""
            )
    )

## Writing the words.txt file
file = open('words.txt','w')
for content in contents:
    file.write(content+"\n")
file.close()

				
			

This code writes a text file where each line represents the words of each article in order separated by commas.

Training Embeddings

With our text file full of words, we can train a simple gensim Word2Vec model.

				
					import os
import logging
from gensim.utils import tokenize
from gensim.models import Word2Vec

logging.basicConfig(
    format='%(asctime)s : %(levelname)s : %(message)s',
    level=logging.INFO
)

with open("words.txt") as file:
    lines = file.readlines()

docs = []
for line in lines:
    docs.append(line.strip().split(","))

model = Word2Vec(sentences=docs, vector_size=300, workers=4, epochs=1000)
model.save('w2v.model')
				
			

Uploading Word Vectors

To use our word vectors in Snowflake, we have to create a table containing each word and its associated vector. This will allow us to join against this table and work with the vectors. Unfortunately, snowflake.connector.pandas_tools does not seem to natively support correctly uploading a dataframe where a column has an embedded array. Our code will upload the vector formatted as a JSON array string.

				
					session.sql('CREATE OR REPLACE TABLE bbc_wordvecs (word varchar, vecstr varchar, vector array)').collect()
				
			

With our table created, let’s upload our words and vector strings.

				
					import os
import sys
import pandas as pd
from gensim.models import Word2Vec

import snowflake.connector as sc
import snowflake.connector.pandas_tools as pt

model = Word2Vec.load("w2v.model")

words = []
for key, val in [(key, model.wv[key]) for key in model.wv.index_to_key]:
  words.append({
    "WORD": key,
    "VECSTR": '['+','.join([str(v) for v in list(val)])+']',
  })

df = pd.DataFrame(words)

success, num_chunks, num_rows, output = pt.write_pandas(conn=conn, df=df, table_name='BBC_WORDVECS')
if not success:
    print(f"Error writing data to snowflake table BBC_WORDVECS")
    print(output)
    sys.exit(1)

print("Successfully wrote data to snowflake:")
print(f"Num Chunks: {num_chunks}")
print(f"num_rows: {num_rows}")
print(output)
A second step is required to parse that vector string and store our vector as an array of floats.
session.sql('UPDATE BBC_WORDVECS SET VECTOR=parse_json(VECSTR)').collect()
				
			

This results in a table that contains a vector for every word in our corpus. A vector is stored as a simple Array of floating point numbers.

A sample dataset from Kaggle.

Now that our word vectors are ready, we can get to work creating our document vectors.

Generating Document Vectors

Scaling and Averaging World Vectors

When using the Python API, one must know that your UDFs must be registered before you reference a dataset. Due to that, we will assemble and register the necessary UDFs and then reference the needed datasets as proper snowpark.DataFrames.

Now that we have a flattened Data Frame of words in each article, a Data Frame of each word and its vectors, and a Data Frame of each word and its IDF score, we can join these elements together and do some scaling and aggregation.

First, we’ll need to create a couple more UDFs. This is where we start to run into some of the limitations in the current version of Snowpark. For some reason, passing an “Array[Float]” or “Array[Double]” is not supported currently, so we are forced to pass our floating point numbers as “Array[String].” At least when we do this, Snowflake implicitly coerces the floats to string when we call the UDF during the Dataframe transformation.

				
					import numpy as np
session.add_packages("numpy")

@udf(name="scale_vector", input_types=[DoubleType(), ArrayType()], return_type=StringType(), replace=True, session=session)
def scale_vectors(idf, vector):
    scaled_vector = np.multiply(vector, idf).tolist()
    return str(scaled_vector)
				
			

As you can see, we have to do some ugly workarounds to convert our vector elements to actual “Double” values and then pass the resulting vector back to Snowflake as a JSON array string. This is preferred over passing back “Array[String]” because we are now able to easily execute a “parse_json()” on the returned value, and Snowflake will treat our new vector as an “Array” of “Float” internally. This helps prevent Snowflake from hitting some serialization limits during Dataframe transformations.

Another wrench in our plan is that we don’t have a way to create user-defined aggregate functions. 

User-defined functions are great for operating on one record at a time, but we don’t have a way to operate on a grouped set of records. To work around this, we do a “.groupBy().agg(array_agg())” on our Data Frame, which collects values into an Array column per record. This works as an alternative, except in our case, we need to do aggregations on vectors that we are storing as arrays.

This means we now must pass an “Array[Array[Float]]” or “Array[Array[Double]]” or even “Array[Array[String]],” but as you may have guessed — these are all unsupported types for UDFs. Unfortunately, we have to do even uglier string-based conversions. Similar to above, Snowflake automatically coerces the Array[Array[Double]] into an Array[String] for us to operate on.

				
					@udf(name="average_vectors", input_types=[ArrayType()], return_type=StringType(), replace=True, session=session)
def average_vectors(vecs):
    l = len(vecs[0])
    
    res = np.zeros(l, dtype=float)
    
    for v in vecs:
        a = list(map(float, v))
        res = res + a

    res = np.multiply(res, 1.0/len(vecs)).tolist()
    
    return str(res)
				
			

As you can see in this UDF, we have to do some less-than-ideal string manipulation to calculate our desired result. On the plus side, our workarounds work well enough, and we can compute our document vectors and store them back into a new Snowflake table. Let’s create a table to hold our document vectors.

Setup Variables

Now that we are done with the proper UDF setting; we need to set up the Data Frames we want to work with, as well as some other variables we’ll need.

				
					articles = session.table("BBC_ARTICLES")
vectors = session.table("BBC_WORDVECS").select(col("WORD"), col("VECTOR"))
corpus = articles.filter(col("CLASS") == "train").count()
				
			

Extracting Words

Next, we break the articles into separate words or tokens, flatten by the word array, and select only the needed columns.

				
					exp = articles.with_column("ARR", extract_words(col("WORDS"))).flatten(col("ARR"))

words = exp.select(
    exp["CLASS"],
    exp["CATEGORY"],
    exp["ID"],
    exp["INDEX"],
    exp["VALUE"].alias("WORD")
)

				
			

Calculating IDF Scores

The well-known TF*IDF score is a way to measure a word’s importance across a corpus of documents. We will calculate the IDF of every word across our training corpus in order to weight each word’s vector by how important that word is in separating it from the rest.

The TF portion of that formula is implicitly included when we average a document’s words together.

				
					idfs = words\
.filter(col("CLASS")== "train")\
.select("CATEGORY", "ID", "WORD")\
.distinct()\
.groupBy("WORD").count()\
.withColumn("IDF", log(lit(2), lit(corpus)/col("COUNT")))\
.select("WORD", "IDF")
				
			

Creating a Table for the Document Vectors

Let’s create a table to hold our document vectors.

				
					session.sql("CREATE OR REPLACE TABLE BBC_DOCVECS (CLASS VARCHAR, CATEGORY VARCHAR, ID BIGINT, DOCVEC ARRAY)").collect()
				
			

Loading Document Vectors

With our UDFs created and our table ready, let’s get those document vectors calculated!

				
					words.join(idfs, "WORD")\
.join(vectors, "WORD")\
.withColumn("SCALEDVEC",
    parse_json(scale_vectors(
       col("IDF"), 
       col("VECTOR")
))).groupBy(
        "CLASS", 
        "CATEGORY", 
        "ID")\
.agg(
    array_agg((
        col("SCALEDVEC")
    )).alias("SVECTORS")).select(
        col('CLASS'),
        col('CATEGORY'),
        col('ID'),
        parse_json(average_vectors(col('SVECTORS'))).alias("DOCVEC")
).write.mode("overwrite").save_as_table("BBC_DOCVECS")
				
			

This process may take some time. Keep this in mind while running it!

After this code runs, our table will be populated with the document vectors for our news articles.

Visualizing our Document Vectors with T-SNE

Overview

For those unfamiliar, a T-SNE is an embedding process that allows one to reduce the dimensionality of high-dimensional data for visualization purposes. If our document vectors carry any meaning, we would expect articles with similar content to be nearer each other in vector space. The T-SNE effectively allows us to visualize that hypothesis.

Generate the Plot

We will leave off the Snowflake connecting code for brevity since it was provided above. For this snippet, use the same pattern as in the “Uploading Word Vectors” to make sure the “con” (Snowflake connector) variable is already populated.

				
					from sklearn.manifold import TSNE
import plotly.express as px
#previous connection code here

cur = conn.cursor()
q = cur.execute(
  f"select a.class, a.category, a.id, a.words, d.docvec::string as dv from bbc_articles a inner join bbc_docvecs d on a.category=d.category and a.id=d.id"
)

df = q.fetch_pandas_all()

vecs = []

for label, content in df.items():
    if label == "DV":
        for v in content:
            vecs.append([float(f) for f in v[1:len(v)-1].split(",")])
        break

arr = np.array(vecs)

emb = TSNE().fit_transform(arr)
df['emb_x'] = pd.Series(emb[:, 0])
df['emb_y'] = pd.Series(emb[:, 1])
df['title'] = df['WORDS'].str.split('\n\n').str[0]

fig = px.scatter(
    df, x="emb_x", y="emb_y",
    color="CATEGORY", symbol=df["CLASS"],
    symbol_sequence=["cross", "circle"], hover_data=['ID', 'title']
)
fig.write_html("plot.html")
				
			

The plot can be seen by opening the plot.html file on a new browser session.

Again, we must treat our array of floating point numbers as a string to prevent conversion issues when reading the table into a pandas Data Frame.

As you can see, the document vectors cluster together very nicely within each category, giving us an excellent foundation for doing downstream machine learning tasks.

Classifying our Test Partition

Preparing our Python Environment

For a more rigorous test of our document vectors, let’s train a Random Forest Classifier. We’re going to leverage a Snowflake warehouse by using the snowflake.ml library.

Creating a Snowflake Stage for the Model Asset

				
					#create the stage for storing the ML models
session.sql('CREATE OR REPLACE STAGE ML_MODELS').show()
				
			

This stage will be used to store the serialized version of the trained model obtained from snowflake.ml library.

Training the Model

We start by importing the RandomForestClassifier object from the snowflake.ml library, which is hugely based on the library scikit-learn, providing the same interfaces for usage.

That is to say, when in doubt about any of its possible underlying behavior, scikit-learn’s documentation can be of great help!

				
					from snowflake.ml.modeling.ensemble import RandomForestClassifier

vecs = []

for label, content in df.items():
    if label == "DV":
        for v in content:
            vecs.append([float(f) for f in v[1:len(v)-1].split(",")])
        break

df['vector'] = pd.Series(vecs)

x_train = [np.array(l) for l in df[df['CLASS']=="train"]['vector']]
y_train = [np.array(l) for l in df[df['CLASS']=="train"]['CATEGORY']]

train_df = pd.DataFrame([np.array(l) for l in df[df['CLASS']=="train"]['vector']])
train_df = train_df.add_prefix("X_")

feature_cols = list(train_df.columns)

train_df["Y"] = df["CATEGORY"].copy()

## Appending the categorical column to the schema definition
schema_definition = feature_cols.copy()
schema_definition = schema_definition.append("Y")
				
			

After assembling the Data Frame, it’s necessary to have it available on Snowflake – as a temporary table in this case – so it’s assumed as a Snowpark-based dataset, accessible from a warehouse, and hence, trained on top of their platform.

				
					sf_training_dataset = session.create_dataframe(train_df, schema=schema_definition)
model = RandomForestClassifier(label_cols=["Y"]).fit(sf_training_dataset)
				
			

Model Deployment

Model Registry vs. UDF Based Deployment of Models

Currently, Snowflake’s Model Registry is on Private Preview and is not recommended for production usage, even though its capabilities seem to be awesome.

We’re very excited to be using it in the near future!

With that in mind, this article explores Snowflake’s platform and versatile computational resources to cover potential improvements in training performance.

				
					model_asset = model.to_sklearn()
				
			

This command will return an object passive of serialization, properly parametrized with the optimized version of these same parameters.

UDF Assembling

First, we need to load the newly trained model into the formerly created Stage dedicated to this purpose.

				
					#save the model
import joblib
joblib.dump(model_asset, 'classify_docs.joblib')

#upload into the ML_MODELS Snowflake Internal Stage
session.file.put(
    "classify_docs.joblib", "@ML_MODELS", auto_compress=False, overwrite=True
)

session.clear_imports()
session.clear_packages()

#Register above uploded model as import of UDF
session.add_import("@ML_MODELS/classify_docs.joblib")

session.add_packages("joblib")
session.add_packages("snowflake-snowpark-python")
session.add_packages("snowflake-ml-python")
				
			

Next, let’s add the necessary code to do inference right inside of a Snowflake UDF.

				
					def read_file(filename):
    import joblib
    import sys
    import os
    
    #where all imports located at
    import_dir = sys._xoptions.get("snowflake_import_directory")

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m
				
			
				
					@udf(name="classify_docs", input_types=[ArrayType()], return_type=StringType(), replace=True, stage_location = '@ML_MODELS', session=session)
def classify_docs(vecs):
    
    test_df = pd.DataFrame(
        np.array(vecs).reshape(1, -1),
        columns=[f'X_{idx}' for idx, vec in enumerate(vecs)]
    )
    pipeline = read_file('classify_docs.joblib')
    
    return pipeline.predict(test_df)[0]
				
			

Classify Document Vectors

With our “classify” UDF created, we can pass in a “DOCVEC” and get back a prediction that represents the most likely category for that document.

Let’s run a batch inference on all of the documents in our test partition right inside of our Snowflake Warehouse and then use the Snowpark Data Frame API to calculate and print a confusion matrix.

				
					test.withColumn("PREDICT", classify_docs(col("DOCVEC")))\
  .select(col("CATEGORY"), col("PREDICT"), lit(1).alias("NUM")).pivot(
      "PREDICT",
      ["business", "entertainment", "politics", "sport", "tech"]
  ).agg(
      sum(col("NUM"))
  ).select(
        col("CATEGORY"),
        coalesce(col("'business'"), lit(0)).alias("business"),
        coalesce(col("'entertainment'"), lit(0)).alias("entertainment"),
        coalesce(col("'politics'"), lit(0)).alias("politics"),
        coalesce(col("'sport'"), lit(0)).alias("sport"),
        coalesce(col("'tech'"), lit(0)).alias("tech")   
  ).show()
				
			

When we execute the above, we get a pretty good-looking confusion matrix showing that our Random Forest model actually learned some useful features from our document vectors.

Analyzing our Results

There you have it! We’ve successfully created a document classifier that uses Snowflake as a feature store, and Snowpark as our primary compute resource for inference. We had a couple of hurdles to jump over, but we were able to get it complete with the tools we had available.

A final table showing the results of the project.

If you have your own data hurdles to jump over and would like some help or advice in implementing a machine learning pipeline on Snowflake, please contact us!

Free Generative AI Workshop

Additionally, we’re running a series of free Generative AI workshops that have helped guide many readers forward in their data journeys. Sign up today for unbiased AI/ML advice!

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit