In this blog, we’ll introduce an incredibly flexible and powerful part of Snowpark that allows developers to create Python functions with their favorite packages and apply them to rows or batches of rows.
These functions take on three forms for different use cases and can be used and defined locally, or registered and saved for ongoing usage. The three forms are:
- User Defined Functions
- User Defined Table Functions
- Vectorized (Pandas) User Defined Functions
For Snowpark to successfully compile them, they must make usage of libraries available in Snowpark’s Conda repository.
Let’s get started!
User Defined Functions
User defined functions (UDFs) can receive multiple columns and other arguments and return one value per row of data. Here we’ll define a simple UDF that takes two columns and returns their sum.
First let’s define an input table:
from snowflake.snowpark.functions import udf, col from snowflake.snowpark.types import IntegerType @udf(name='a_plus_b', input_types=[IntegerType(), IntegerType()], return_type=IntegerType(), is_permanent=False, replace=True) def a_plus_b(a: int, b: int) -> int: return a+b df.withColumn('A_PLUS_B', a_plus_b(col('A'), col('B')))
This will produce the output:
Looking at the @UDF decorator, we can see that we need to declare a few things:
- name=’a_plus_b’; the name the function will be registered as in Snowflake (matches the function definition)
- input_types=[IntegerType(), IntegerType()]; this defines the Snowpark type of the columns containing containing our values
- return_type=IntegerType(); defines the output type of the output column
- is_permanent=False; Set to false, this UDF won’t be kept outside of this session
- replace=True; if a UDF has already been registered with the same name, replace it, generally useful when you’re working locally, but be careful not to replace an important business function accidentally!
User Defined Table Functions
In the UDF example, we showed how to produce a new column of data calculated row-by-row with a UDF. User Defined Table Functions (UDTFs) have two differences we’ll highlight here.
Most importantly, UDTFs return a table and might be applied with a lateral join of the returned table to the original table. In addition, UDTFs can be processed with user defined partitions in a partition-aware fashion.
Let’s dig into an example.
We’ll start with a dataframe with a category column and and two value columns:
Now we’ll define our UDTF as a class, and add three methods. The process method is mandatory. You can think of it behaving similar to the UDF function in the previous example, it will yield a tuple per row. The next two functions, __init__ and end_partition allows us to create partition-aware behavior.
In this case, we’ll return the sum of value1 and value2 and the total of value1 + value2 for an entire partition to yield it as an additional row.
from snowflake.snowpark.functions import udtf, col from typing import Iterable, Tuple from snowflake.snowpark.types import IntegerType @udtf(name='totals', input_types=[IntegerType(), IntegerType()], output_schema=["total"], is_permanent=False, replace=True) class totals: def __init__(self): self.group_total = 0 def process(self, value1: int, value2: int) -> Iterable[Tuple[int]]: self.group_total += (value1 + value2) yield (value1 + value2,) def end_partition(self): yield (self.group_total,) df.join_table_function(totals("VALUE1", "VALUE2").over(partition_by=col('CATEGORY'))).show()
Looking at the @UDTF decorator, one notable change:
- output_schema=[“total”]; here state that there will be one column in the output table, with a column named “total”
Vectorized (Pandas) User Defined Functions
In the previous examples, the functions are applied row-by-row. They can be partition-aware (in the case of UDTFs) or they can be partition agnostic and simply run for each row. Snowpark provides a third option, vectorized UDFs, where computations can be performed over an entire partition at once.
pandas_udf is an alias UDF, strictly for taking a vector per partition as a Pandas Dataframe or Series and returning a Pandas Series. We can take the following table and returns the multiplication of the two columns:
import pandas as pd from snowflake.snowpark.functions import pandas_udf from snowflake.snowpark.types import IntegerType, PandasSeriesType @pandas_udf(name='multiplier', input_types=[PandasSeriesType(IntegerType()), PandasSeriesType(IntegerType())], return_type=PandasSeriesType(IntegerType()), is_permanent=False, replace=True) def multiplier(column1: pd.Series, column2: pd.Series) -> pd.Series: return column1 * column2 df.withColumn('MULTIPLIED', multiplier(col('A'), col('B'))).show()
The arguments for a pandas_udf are identical to a UDF, but the input and return types must be some sort of vector. Remember that the partitioning is done in Snowpark, and the size is determined by the planner.
You can specify a max_batch_size which limits how big a partition can get in rows, but not set the actual size a batch will be.
We’ve covered three different approaches to defining functions to operate on tables in Snowpark. Each has its unique features, and all are powerful ways to expand what you can accomplish with Snowflake.
Interested in learning more about how Snowpark UDFs can work for your business? Contact the phData Data Science team today for questions, best practices, advice or more information!