Snowflake ML: How to do Document Classification with Snowpark

Join us on this technical walkthrough as we determine the practicality of the Snowflake Data Cloud and Snowpark for machine learning use-cases.

Document Vectors

With the success of word embeddings, it’s understood that entire documents can be represented in a similar way. In this case study, we will build a vector that represents a document that is derived from an IDF weighted average of the word embeddings that make up the document.

This form of unsupervised machine learning should put documents that use many similar words near each other in the resulting vector space, which should allow us to try some interesting classification and visualization tasks.

Preparing the Data

We will use a BBC news articles dataset found on kaggle. This dataset consists of 2,225 news articles in five different categories: business, entertainment, politics, sports, and technology. We will use several combinations of tools to create a vector which attempts to represent each article in euclidean space. We will create more Snowflake tables to act as parts of a feature store, explore those features for value, and try our hand at a couple of different models to classify a hold-out set.

The data is available in Snowflake and has been partitioned with 1,781 random articles in a training set, and the remaining 446 in a test set, held out from all aspects of the training. Our Snowflake table looks like this:

A table displaying a sample set of data

The class column is either “test” or “train,” and represents to which partition the article belongs. The ID column is the unique ID of the article within a category, and the category column is the category to which the article belongs. Take note that only the category/ID combination is unique in this dataset, not the ID alone.

Preparing our Snowpark Environment

Snowpark 0.6.0 is available in Maven at the OSGeo Release Repository. This makes setting up our dependencies and preparing for a production environment much easier as we can use the Maven Shade plugin to create an uber jar that includes all of our dependencies, and then execute that jar as a standard java application, where that environment will act as our snowpark “driver” process. 

With that we can do all of our best practices like creating a CI/CD pipeline to deploy our jar artifact, and then reading configuration variables from our environment at runtime. To start, generate a Maven-based Scala project and then make sure our pom.xml is set with Scala version 2.12.11, and add the OSGeo repository.

				
					<repositories>
  <repository>
    <id>OSGeo Release Repository</id>
    <url>https://repo.osgeo.org/repository/release/</url>
  </repository>
</repositories>
				
			

We’ll also need a couple of extra dependencies.

				
					<dependency>
  <groupId>com.snowflake</groupId>
  <artifactId>snowpark</artifactId>
  <version>0.6.0</version>
</dependency>
<dependency>
  <groupId>org.scalanlp</groupId>
  <artifactId>breeze_2.12</artifactId>
  <version>1.2</version>
</dependency>
				
			

And finally for the pom.xml, we’ll need to set up the Maven Shade plugin to create an app entry point, and include all dependencies in the jar.

				
					<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-shade-plugin</artifactId>
  <executions>
    <execution>
      <phase>package</phase>
      <goals>
        <goal>shade</goal>
      </goals>
      <configuration>
        <transformers>
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
            <mainClass>io.phdata.App</mainClass>
          </transformer>
        </transformers>
        <artifactSet>
        </artifactSet>
      </configuration>
    </execution>
  </executions>
  <configuration>
      <filters>
        <filter>
          <artifact>*:*</artifact>
          <excludes>
            <exclude>META-INF/*.SF</exclude>
            <exclude>META-INF/*.DSA</exclude>
            <exclude>META-INF/*.RSA</exclude>
          </excludes>
        </filter>
      </filters>
  </configuration>
</plugin>

				
			

With this configuration, we can execute our compiled uber jar with a very simple command like the following:

				
					java -jar target/my-jar-0.0.1.jar
				
			

With the pom.xml finished, let’s start our “main()” method of our “App” object. The first thing we need to do is establish our Snowflake session. The session is passed implicitly when we create UDFs, and is used to read tables as dataframes. The following code should start the body of your “main()” method.

				
					val account = sys.env.getOrElse("SF_ACCOUNT", "")
val schema = sys.env.getOrElse("SF_SCHEMA", "")
val warehouse = sys.env.getOrElse("SF_WAREHOUSE", "")
val database = sys.env.getOrElse("SF_DATABASE", "")
val user = sys.env.getOrElse("SF_USER", "")
val role = sys.env.getOrElse("SF_ROLE", "")
val pk_file = sys.env.getOrElse("SF_PK_FILE", "")
val pk_password = sys.env.getOrElse("SF_PK_PASSWORD", "")

val config = Map[String, String] (
  "URL" -> s"https://$account.snowflakecomputing.com",
  "USER" -> user,
  "PRIVATE_KEY_FILE" -> pk_file,
  "PRIVATE_KEY_FILE_PWD" -> pk_password,
  "ROLE" -> role,
  "WAREHOUSE" -> warehouse,
  "DB" -> database,
  "SCHEMA" -> schema
)

val session = Session.builder.configs(config).create


				
			

With our environment established, we can start generating our features!

Training the Word Vectors

Extracting Tokens

First, we create a UDF that takes in a string, and breaks it into individual words, excluding punctuation. A UDF is a function that operates on a single row in a Snowflake table to produce a prediction. Snowpark DataFrames, on the other hand, allow us to apply operations (including UDFs) across all of the rows in a table.

				
					val extractWordsUDF = udf((article: String) => {
  val pattern = "\\b\\w+\\b".r

  pattern.findAllIn(article.toLowerCase).toArray
})
				
			

This UDF can be applied to a text column in Snowflake, and it returns an array of strings which match any sequence of word characters as defined in Scala’s regular expression engine. We will apply this UDF to each article, and write the resultant tokens to a local file to feed a gensim Word2Vec model in python later.

				
					val df = session.read.table("BBC_ARTICLES")
    .filter(col("CLASS")===lit("train"))

val words = df.withColumn(
    "ARR",
    extractWordsUDF(df("WORDS"))
).select("ARR").collect()


val content = words
    .map(_.getListOfVariant(0).map(_.asString()).mkString(","))
    .mkString("\n")


import java.io._
val pw = new PrintWriter(new File("words.txt"))
pw.write(content)
pw.close()


				
			

This code writes a text file where each line represents the words of each article in order separated by commas.

Training Embeddings

With our text file full of words, we can train a simple gensim Word2Vec model.
				
					import os
import logging
from gensim.utils import tokenize
from gensim.models import Word2Vec

logging.basicConfig(
    format='%(asctime)s : %(levelname)s : %(message)s',
    level=logging.INFO
)

with open("words.txt") as file:
    lines = file.readlines()

docs = []
for line in lines:
    docs.append(line.strip().split(","))

model = Word2Vec(sentences=docs, vector_size=300, workers=4, epochs=1000)
model.save('w2v.model')


				
			

Uploading Word Vectors

Now, in order to make use of our word vectors in Snowflake, we have to create a table containing each word, and its associated vector. This will allow us to join against this table and do work with the vectors. Unfortunately, snowflake.connector.pandas_tools does not seem to natively support correctly uploading a dataframe where a column has an embedded array. Our code will upload the vector formatted as a JSON array string.

				
					create table bbc_wordvecs (word varchar, vecstr varchar, vector array)
				
			

With our table created, let’s upload our words and vector strings.

				
					import os
import sys
import pandas as pd
from gensim.models import Word2Vec

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.asymmetric import dsa
from cryptography.hazmat.primitives import serialization

import snowflake.connector as sc
import snowflake.connector.pandas_tools as pt

model = Word2Vec.load("w2v.model")

words = []
for key, val in [(key, model.wv[key]) for key in model.wv.index_to_key]:
  words.append({
    "WORD": key,
    "VECSTR": '['+','.join([str(v) for v in list(val)])+']',
  })

df = pd.DataFrame(words)

#snowflake params
account = os.getenv('SF_ACCOUNT')
warehouse = os.getenv('SF_WAREHOUSE')
database = os.getenv('SF_DATABASE')
schema = os.getenv('SF_SCHEMA')
un = os.getenv('SF_USER')
pw = os.getenv('SF_PK_PASSWORD')
pk = os.getenv('SF_PK_FILE')

with open(pk, "rb") as key:
    p_key = serialization.load_pem_private_key(
        key.read(),
        password=pw.encode(),
        backend=default_backend()
    )

pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption()
)

con = sc.connect(
    user=un,
    private_key=pkb,
    account=account,
    warehouse=warehouse,
    database=database,
    schema=schema
)

success, num_chunks, num_rows, output = pt.write_pandas(conn=con, df=df, table_name='BBC_WORDVECS')
if not success:
    print(f"Error writing data to snowflake table BBC_WORDVECS")
    print(output)
    sys.exit(1)

print("Successfully wrote data to snowflake:")
print(f"Num Chunks: {num_chunks}")
print(f"num_rows: {num_rows}")
print(output)


				
			

A second step is required to parse that vector string and store our vector as an array of floats.

				
					update bbc_articles set vector=parse_json(vecstr)
				
			

This results in a table that contains a vector for every word in our corpus. A vector is stored as a simple Array of floating point numbers.

A sample table of data in Snowflake

Now that our word vectors are ready. we can get to work creating our document vectors.

Generating Document Vectors

Setup Variables

First, we set up the Dataframes we want to work with, as well as some other variables we’ll need.

				
					val articles = session.read.table("BBC_ARTICLES")

val vectors = session.read.table("BBC_WORDVECS")
  .select("WORD", "VECTOR")

val corpus = articles.filter(col("CLASS")===lit("train")).count


				
			

Extracting Words

Next, we break the articles into separate words or tokens, flatten by the word array, and select only the needed columns.

				
					val exp = articles
  .withColumn("ARR", extractWordsUDF(articles("WORDS")))
  .flatten(col("ARR"))

val words = exp.select(
    exp("CLASS"),
    exp("CATEGORY"),
    exp("ID"),
    exp("INDEX"),
    exp("VALUE").as("WORD")
)


				
			

Calculating IDF Scores

The well known TF*IDF score is a way to measure a word’s importance across a corpus of documents. We will calculate the IDF of every word across our training corpus in order to weight each word’s vector by how important that word is in separating it from the rest. The TF portion of that formula is implicitly included when we average a document’s words together.

				
					val idfs = words
  .filter(col("CLASS")===lit("train"))
  .select("CATEGORY", "ID", "WORD")
  .distinct()
  .groupBy("WORD").count()
  .withColumn("IDF", log(lit(2), lit(corpus)/col("COUNT")))
  .select("WORD", "IDF")
				
			

Scaling and Averaging Word Vectors

Now that we have a flattened Dataframe of words in each article, a Dataframe of each word and its vectors, and a Dataframe of each word and its IDF score, we can join these elements together and do some scaling and aggregation.

First, we’ll need to create a couple more UDFs. This is where we start to run into some of the limitations in the current version of Snowpark. For some reason, passing an “Array[Float]” or “Array[Double]” is not supported currently, so we are forced to pass our floating point numbers as “Array[String]”. At least when we do this, Snowflake implicitly coerces the floats to string when we call the UDF during the Dataframe transformation.

				
					//UDF to scale word vector by idf score
val scaleVector = udf((idf: Double, vector: Array[String]) => {
  //resorting to casting to/from strings to do UDF arithmetic
  "["+(DenseVector(vector.map(_.toDouble)) * idf).toArray.mkString(",")+"]"
})
				
			

As you can see, we have to do some ugly work-arounds to convert our vector elements to actual “Double” values, and then pass the resulting vector back to Snowflake as a JSON array string. This is preferred over passing back “Array[String]” because we are now able to easily execute a “parse_json()” on the returned value and Snowflake will treat our new vector as an “Array” of “Float” internally. This helps prevent Snowflake from hitting some serialization limits during Dataframe transformations.

Another wrench in our plan is that we don’t have a way to create user defined aggregate functions. User defined functions are great for operating on one record at a time, but we don’t have a way to operate on a grouped set of records. To work around this, we do a “.groupBy().agg(array_agg())” on our Dataframe, which collects values into an Array column per record. This works as an alternative, except in our case we need to do aggregations on vectors that we are storing as arrays. 

This means we now must pass an “Array[Array[Float]]” or “Array[Array[Double]]” or even “Array[Array[String]]”, but as you may have guessed — these are all unsupported types for UDFs. Unfortunately, we have to do even uglier string based conversions. Similar to above, Snowflake automatically coerces the Array[Array[Double]] into an Array[String] for us to operate on.

				
					//UDF to average weighted word vectors into a docvec
//each element of passed array is a string based array
//because snowpark doesn't support nested arrays
val averageVectors = udf((vecs: Array[String]) => {
  val l = vecs(0).slice(1, vecs(0).length-1).split(",").length

  var res = DenseVector.zeros[Double](l)
  vecs.foreach(v => {
    val a = v.slice(1, v.length-1).split(",")
    res = res + DenseVector(a.map(_.toDouble))
  })

  res = res * (1.0/vecs.length)

  "["+res.toArray.mkString(",")+"]"
})


				
			

As you can see in this UDF, we have to do some less-than-ideal string manipulation to calculate our desired result. On the plus side, our work-arounds work well enough, and we can compute our document vectors and store them back into a new Snowflake table. Let’s create a table to hold our document vectors.

				
					create table bbc_docvecs
  (class varchar, category varchar, id bigint, docvec array)
				
			

With our UDFs created and our table ready, let’s get those document vectors calculated!

				
					words.join(idfs, "WORD")
  .join(vectors, "WORD")
  .withColumn("SCALEDVEC", parse_json(
     scaleVector(idfs("IDF"), vectors("VECTOR"))
  ))
  .groupBy("CLASS", "CATEGORY", "ID")
  .agg(array_agg(col("SCALEDVEC")).as("SVECTORS"))
  .select(
     col("CLASS"), col("CATEGORY"), col("ID"), parse_json(
       averageVectors(col("SVECTORS"))
     ).as("DOCVEC")
  )
  .write.mode(SaveMode.Overwrite).saveAsTable("BBC_DOCVECS")

				
			

After this code runs, our table will be populated with the document vectors for our news articles.

A table displaying a sample set of data

Visualizing our Document Vectors with T-SNE

Overview

For those unfamiliar, a T-SNE is an embedding process that allows one to reduce the dimensionality of high dimensional data for visualization purposes. If our document vectors carry any meaning, we would expect that articles with similar content be nearer each other in vector space. The T-SNE effectively allows us to visualize that hypothesis.

Generate the Plot

For brevity, we will leave off the Snowflake connecting code since it was provided above. For this snippet, use the same pattern as in the “Uploading Word Vectors” to make sure the “con” (Snowflake connector) variable is already populated.

				
					import pandas as pd
import numpy as np
from sklearn.manifold import TSNE
import plotly.express as px
#previous connection code here

cur = con.cursor()
q = cur.execute(
  f"select a.class, a.category, a.id, a.words, d.docvec::string as dv from bbc_articles a inner join bbc_docvecs d on a.category=d.category and a.id=d.id"
)

df = q.fetch_pandas_all()

vecs = []
for k, v in df['DV'].iteritems():
    vecs.append([float(f) for f in v[1:len(v)-1].split(",")])

arr = np.array(vecs)

emb = TSNE().fit_transform(arr)
df['emb_x'] = pd.Series(emb[:, 0])
df['emb_y'] = pd.Series(emb[:, 1])
df['title'] = df['WORDS'].str.split('\n\n').str[0]

fig = px.scatter(
    df, x="emb_x", y="emb_y",
    color="CATEGORY", symbol=df["CLASS"],
    symbol_sequence=["cross", "circle"], hover_data=['ID', 'title']
)
fig.write_html("plot.html")


				
			

Executing this code from the command line writes a self contained html file that can be opened in a browser for an interactive chart. Again, we have to treat our array of floating point numbers as a string to prevent conversion issues when reading the table into a pandas dataframe.

As you can see, the document vectors cluster together very nicely within each category, giving us a very good foundation for doing downstream machine learning tasks.

Classifying our Test Partition

Preparing our Scala Environment

For a more rigorous test of our document vectors, let’s train a Random Forest Classifier, embed it in an uber jar, and execute it with snowpark. First, we take some hints about training, exporting, and executing our model from Executing Machine Learning Models In Snowpark and Machine Learning on Snowflake: Clustering Data with Snowpark. Then we’ll modify our Scala environment a bit. We are going to have to introduce a new dependency for PMML.

				
					<dependency>
  <groupId>org.pmml4s</groupId>
  <artifactId>pmml4s_2.12</artifactId>
  <version>0.9.11</version>
</dependency>
				
			

Since we’ve been focusing on building a fully deployable jar file, we will add the model PMML file as a resource which gets embedded into our final jar. In the “<build>” then “<resources>” tag of our pom.xml, we will add the following section.

				
					<resource>
  <directory>${project.basedir}/src/main/resources/</directory>
  <filtering>false</filtering>
</resource>


				
			

Then create the corresponding directory from the top level of our scala project.

				
					mkdir src/main/resources
				
			

Training our Model

Next, we will train our model using python’s sklearn libraries. With the same df that we used for our visualization earlier, execute the following.

				
					vecs = []
for k, v in df['DV'].iteritems():
    vecs.append([float(f) for f in v[1:len(v)-1].split(",")])
    
df['vector'] = pd.Series(vecs)

x_train = pd.DataFrame(
  [np.array(l) for l in df[df['CLASS']=="train"]['vector']]
)
x_test = pd.DataFrame(
  [np.array(l) for l in df[df['CLASS']=="test"]['vector']]
)

pipe = Pipeline([('model', RandomForestClassifier())])
pipe.fit(x_train, y_train)

skl_to_pmml(
  pipeline=pipe,
  col_names=[str(i) for i in range(300)],
  pmml_f_name="classifier.pmml"
)


				
			

After a successful run, you’ll produce a file called “classifier.pmml”. We have to copy this file into our Scala project at src/main/resources. Once there, the PMML model will be embedded into our jar file whenever we run the “package” command for maven.

Classify Document Vectors

Now that we have our Scala environment ready, let’s add the necessary code to do inference right inside of a Snowflake UDF!

				
					val model = Model(scala.io.Source.fromResource("classifier.pmml"))

//UDF to classify document vectors
val classify = udf((docvec: Array[String]) => {
  model.predict(docvec.map(_.toDouble)).last.toString
})
				
			

With our “classify” UDF created, we can pass in a “DOCVEC” and get back a prediction that represents the most likely category for that document! Let’s run a batch inference on all of the documents in our test partition right inside of our Snowflake Warehouse and then use the Snowpark DataFrame API to calculate and print a confusion matrix.

				
					val test = session.read.table("BBC_DOCVECS")
  .filter(col("CLASS")===lit("test"))

test.withColumn("PREDICT", classify(test("DOCVEC")))
  .select(col("CATEGORY"), col("PREDICT"), lit(1).as("NUM"))
  .pivot(
    "PREDICT",
    Seq("business", "entertainment", "politics", "sport", "tech")
  )
  .agg(sum(col("NUM")))
  .select(
    col("CATEGORY"),
    coalesce(col("'business'"), lit(0)).as("business"),
    coalesce(col("'entertainment'"), lit(0)).as("entertainment"),
    coalesce(col("'politics'"), lit(0)).as("politics"),
    coalesce(col("'sport'"), lit(0)).as("sport"),
    coalesce(col("'tech'"), lit(0)).as("tech")
  )
  .sort(col("CATEGORY"))
  .show()


				
			

When we execute the above, we get a pretty good looking confusion matrix showing that our Random Forest model actually learned some useful features from our document vectors.

Analyzing our Results

A table written in Shell code displaying some data

There you have it! We’ve successfully created a document classifier that uses Snowflake as a feature store, and Snowpark as our primary compute resource for inference. We had a couple of hurdles to jump over, but we were able to get it complete with the tools we had available.

If you have your own data hurdles to jump over and would like some help or advice in implementing a machine learning pipeline on Snowflake, please contact us!

Share on linkedin
Share on twitter
Share on facebook
Share on email

Table of Contents

More to explore

Dependable data products, delivered faster.

Snowflake Onboarding Accelerator

Infrastructure-as-code Accelerator

Snowflake Account Visualization and Auditing

Operational Monitoring and Observability Accelerator

SaaS SQL Translator