August 23, 2021

A Spark Developer’s Guide to Snowpark

By Travis Hegner

As a Spark developer who uses the Snowflake Data Cloud, you’ve undoubtedly heard all the buzz around Snowpark. Having the ability to execute arbitrary Scala code in your Snowflake compute environment could be game-changing! You begin to wonder how this works in a practical way, and ask yourself the following questions:

  • What is the architecture of a Snowpark job?
  • What does the Snowpark API look like?
  • What are the benefits of migrating to Snowpark?
  • What are the drawbacks of migrating to Snowpark?
  • How difficult will it be to migrate my jobs to Snowpark?
  • How do I deploy and run a Snowpark job in production?

Let’s explore some answers to these questions.

What is the Architecture of a Snowpark Job?

A Snowpark job is conceptually very similar to a Spark job in the sense that the overall execution happens in multiple different JVMs. The job begins life as a client JVM running externally to Snowflake. This can be on your workstation, an on-premise datacenter, or some cloud-based compute resource. This JVM authenticates to Snowflake and establishes a “session” that will be used to interact with the Snowflake Cloud Data Platform.

You can think of this client JVM like Spark’s “Driver” process. Effectively, the client JVM will orchestrate transformations and trigger actions to happen within the Snowflake compute environment.

A simple diagram with two headings, "Client JVM" and "Snowflake Cloud Compute" followed by a few lines of code.

Much akin to Spark, a “transformation” yields no data movement or processing and only adds a link into a type of change pipeline until an “action” occurs. An action will trigger data to be processed through all transformations to perform that action.

Common Snowpark Operations

Transformations

Actions

select

filter

join 

pivot 

collect

write

first

count

Also, like Spark, one should be careful about calling certain functions that will download data into the memory of the running client/driver application. Functions like “.collect()” and “.take()” will do just that. This type of action is perfectly fine if your DataFrame is small, but if it’s too large to fit into memory, it could lead to an “Out Of Memory” error.

What Does the Snowpark API Look Like?

The Snowpark library allows you to “read” each of your Snowflake tables as DataFrames with a very familiar API. When you “read” a table, you are merely creating a reference to it in memory until you perform an action. You can perform any number of transformations to this reference, building up a transformation chain that will actually be processed at the time you perform an “action”.

At the time you perform an action, the Snowpark library will analyze your transformation chain and generate an equivalent SQL query which gets passed up to the Snowflake compute resources to be executed. The generated query is logged by your client/driver application, so it’s easier to debug if something goes wrong, or if you’d like to test the query manually using the Snowflake UI.

If your transformation is simply too complex to be represented as DataFrame operations, Snowpark offers the ability to register both permanent and temporary “User Defined Functions.” As explained in the UDF Registration documentation, you can create UDFs with from 0 up to 10 parameters (passed as DataFrame Columns). The UDF processes one record at a time and returns a new Column value that can be appended to your DataFrame via “.withColumn()”, used as part of a “.select()”, or used in any other place that requires a column expression.

Strung together correctly, you can use these powerful features to build up a very complete data transformation pipeline that happens entirely within the Snowflake Cloud Data Platform! This prevents the need to move data in and out of Snowflake’s network into a dedicated compute environment.

What are the Benefits of Migrating to Snowpark?

If your Spark workload is capable of being migrated to Snowpark, you will see some amazing benefits right away. First and foremost, you no longer have to move data into and out of the Platform for your Spark cluster, so you’ll have a large time, compute, and bandwidth savings immediately.

Your production pipelines will also greatly benefit from the structured nature of using the DataFrame API to describe your transformations. For example, if you attempt to query a table by passing the following raw query to Snowflake (notice the erroneous comma), an error will occur at runtime.

				
					select c1, c2, from my_table
				
			

But, if you write the following query (with a similar erroneous comma) using the Snowpark DataFrame API, your error will be caught nearly immediately at compile-time.

				
					myDF.select(col("c1"), col("c2"),).show()
				
			

Using CI/CD best practices, this code would never make it to production — saving countless hours of debugging and troubleshooting.

What are the Drawbacks of Migrating to Snowpark?

In Spark, it’s possible to use a strongly typed Dataset API and perform distributed map and reduce functions. This sometimes results in slightly lower performance, but makes certain problems easier to solve. In Snowpark, you can simulate this workflow with UDFs and Aggregations, but this sometimes makes the code a little more convoluted.

Snowpark is missing the ability to create “User Defined Aggregate Functions” (UDAF) which would be a more performant and easier to understand way to represent some aggregation problems. You can work around this with some clever use of “.groupBy()”, “array_agg()”, and a UDF to handle each array, but a dedicated UDAF framework would likely be more performant, easier to read, and easier to maintain.

The data types supported for UDF parameters are somewhat limited. In an ideal world, the Array type would support any other supported type as its base, including nesting arrays. You could get creative with the use of boilerplate custom types and variants, but that adds a bit of overhead. A native “Vector” type supporting sparse and dense vectors and multidimensional arrays of numeric types would also be helpful for some machine learning workloads.

How Difficult is it to Migrate Jobs to Snowpark?

So you’ve considered the risks and the benefits outweigh the drawbacks for your workload, how hard will it be to migrate?

Answering this question depends on a couple of factors. The easiest jobs to migrate will be those that can be expressed purely in the Snowpark DataFrame API. If your workflow already uses the Spark SQL API, then the migration will be nearly copy/paste. 

If your Spark job is a little more complex and makes use of a combination of the DataFrame API and standard UDFs, it will be a little harder to migrate, but still relatively easy as your code’s logic shouldn’t have to change much.

If your Spark workflow currently makes use of any UDAFs, or uses the RDD API, there will be some work cut out in re-writing the logic to work within the DataFrame API. This is doable but will require some development and validation time. In all fairness, if you are still using the RDD API, you should be considering migrating to at least the Spark DataFrame API anyway for its performance benefits, but if you are making that jump, then the jump to the Snowpark DataFrame API isn’t any further.

If you are still using the RDD API, you should be considering migrating to at least the Spark DataFrame API anyway for its performance benefits, but if you are making that jump, then the jump to the Snowpark DataFrame API isn’t any further.

There are some differences in a couple of the built-in SQL functions (i.e. “collect_list()” vs. “array_agg()”, as well as how some DataFrame methods are handled. For example, in Spark, a “.pivot()” is executed against a grouped dataset, while in Snowpark the grouping is implicitly assumed based on the available columns, and is executed against the DataFrame itself. Other minor differences will require your migration to be tested thoroughly before running in production. At a minimum, make sure that:

  • The SQL functions you utilize are available in Snowpark.
  • Your UDF parameters have supported data types.
  • Your Dataframe code compiles against the Snowpark DataFrame API.
  • Your output data is generated exactly as you expect.

How to Run & Deploy Snowpark in a Production Pipeline

You’ve crossed the migration hurdle, but now you need to run your shiny new code in production. What does that look like?

Because Snowpark is a client library, you’ll still require some form of compute resources to exist outside of the Snowflake environment. While there are many different ways that this could be accomplished, let’s explore one architecture that should work fairly well.

Following the example set in Document Classification on Snowpark:

  1. Create a Maven project which makes use of the Maven Shade Plugin and creates an executable uber jar
  2. Create a centralized Git repository to store this code
  3. Create a CI/CD pipeline which:
    1. Builds and tests your uber jar
    2. Drops it into a docker image based on the openjdk official image
    3. Pushes it into a docker registry
  4. Set up an external process to run this image periodically at your desired time interval

As long as your Snowpark client/driver application never retrieves a large amount of data locally, then the compute resources required to execute this container should be very minimal. For reference, here is a sample production pipeline implemented in AWS.

A diagram of a sample production pipeline implemented in AWS

If your team would like expert help in migrating your Spark workloads to Snowpark, or evaluating whether it’s possible, contact phData today!

Interested in Learning More About Snowpark?

Be sure to check out our other Snowpark how-tos, walkthroughs, and installation guides!

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit