November 4, 2021

How to Connect to Snowflake Using Spark

By Meesum Ali

Spark processes large volumes of data and the Snowflake Data Cloud is a modern data platform, together they help enterprises make more data-driven decisions. But how does one go about connecting these two platforms? 

Whether you’re interested in using Spark to execute SQL queries on a Snowflake table or if you just want to read data from Snowflake and explore it using the Spark framework, this blog will walk you through: 

  • The installation and configuration of the Spark Snowflake Connector
  • How to establish a Snowflake connection using PySpark
  • How to establish a Snowflake connection using Scala
  • Running Spark on Snowflake with Snowpark

How to Install and Configure The Spark Snowflake Connector

To use the Spark Snowflake connector, you will need to make sure that you have the Spark environment configured with all of the necessary dependencies. The dependencies in question are the Snowflake JDBC driver, Spark Snowflake Connector (SSC), and the Spark framework itself.

Step 1

The first thing you need to do is decide which version of the SSC you would like to use and then go find the Scala and Spark version that is compatible with it. The SSC can be downloaded from Maven (an online package repository). In the repository, there are different package artifacts for each supported version of Scala, and within the Scala versions, there are different versions of the SSC. 

In addition, there are separate artifacts that support different versions of the Spark framework. The SSC packages have the following naming conventions, “X.X.X-spark_Y.Y”  

The three X’s represent the version of Snowflake and the two Y’s represent the version of Spark. For example, Snowflake version 2.9.1 or Spark version 3.1.

A table containing 4 rows and 5 columns

Step 2

Once you have found the version of the SSC you would like to use, the next step would be to download and install its corresponding jar files and the jar files for the dependencies mentioned above in your Spark cluster.

The spark-shell –packages command can be used to install both the Spark Snowflake Connector and the Snowflake JDBC driver in your local Spark cluster. It can also be used to install any other missing packages required for SSC.

Example command:

				
					spark-shell --packages net.snowflake:snowflake-jdbc:3.12.17,net.snowflake:spark-snowflake_2.12:2.8.4-spark_3.0
				
			

How to Connect to Snowflake using PySpark

To enable Spark in Python, the PySpark script in the Spark distribution is required. What this means is that as we needed to install the SSC and Snowflake JDBC driver in the Spark shell script, we will have to do the same for the PySpark script using the command given below.

Example command:

				
					bin/pyspark --packages net.snowflake:snowflake-jdbc:3.8.0,net.snowflake:spark-snowflake_2.11:2.4.14-spark_2.4
				
			

PRO TIP: Be sure to include the SSC and Spark JDBC driver in your Class Path environment variables so that the PySpark distribution recognizes the installation.

Now that we have our environment setup, the next step is to run our Python code that will allow us to connect to Snowflake using PySpark. The first setup in our code would be to import the necessary libraries and establish our Spark context for a local Spark cluster.

Example Code:

				
					#import required libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

mySparkContext = SparkContext("local", "example App")
spark = SQLContext(mySparkContext )
spark_conf = SparkConf().setMaster('local')
				
			

Once the Spark context is set up, the next step is to create a python dictionary that contains the required Snowflake parameters to establish the Snowflake connection. 

Required Parameters:

Account Identifier – Unique identifier for your Snowflake account, either created by your organization or automatically generated by Snowflake upon account creation

User Name – User name for your Snowflake account, created during account creation

Password – Password for your account, created during account creation

Database – Database in the Snowflake account that you are attempting to access

Schema – Schema within the database that you are attempting to access

Warehouse – Snowflake compute resources that are being used for your operations

Example Code:

				
					# Snowflake connection parameters
sfparams = {
  "sfURL" : "<account_identifier>.snowflakecomputing.com",
  "sfUser" : "<user_name>",
  "sfPassword" : "<password>",
  "sfDatabase" : "<database>",
  "sfSchema" : "<schema>",
  "sfWarehouse" : "<warehouse>"
}
				
			

The final step would be to read in a Snowflake table into a Spark dataframe to verify that the connection has been established. To do this, the dbtable option has to be specified on the Spark dataframe. If you would like to execute a custom query, then the query option would need to be specified instead.

Example Code:

				
					#read full table
df = spark.read.format(“snowflake”) \
  .options(**sfparams) \
  .option("dbtable",  "Employee") \
  .load()

#run custom query
df = spark.read.format(“snowflake”) \
  .options(**sfparams) \
  .option("query",  "SELECT * FROM Employee") \
  .load()
				
			

How to Connect to Snowflake using Scala

Ideally, if the earlier installation and configuration steps have been completed, no additional configuration should be required to use the Snowflake Connector within Scala. All that we need to cover would be the actual Scala code itself for connecting and performing operations on Snowflake. 

The steps in the code are identical to Python in that we will have to create our credentials and pass those credentials when using our Spark dataframe to perform operations on Snowflake. Also similar to Python the “dbtable” or “query” options, they will be passed using our Spark dataframe to specify how we would like to read data from Snowflake.

Example Code:

				
					import org.apache.spark.sql._

// Configure your Snowflake parameters

var sfparams = Map(
    "sfURL" -> "<account_identifier>.snowflakecomputing.com",
    "sfUser" -> "<user_name>",
    "sfPassword" -> "<password>",
    "sfDatabase" -> "<database>",
    "sfSchema" -> "<schema>",
    "sfWarehouse" -> "<warehouse>"
)

val SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

// Read Snowflake Table into Spark Dataframe

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfparams)
    .option("dbtable", "Employee")
    .load()

//  Read Snowflake Table via a SQL query

val df: DataFrame = sqlContext.read
    .format(SNOWFLAKE_SOURCE_NAME)
    .options(sfparams)
    .option("query", "SELECT * FROM Employee")
    .load()

				
			

Interested in Running Spark on Snowflake?

If you would like to bypass the Snowflake connector all together, another option you have is using Snowflake’s Snowpark. 

Snowpark is a service created by Snowflake that allows you to run Spark on Snowflake without a need for detailed configuration to connect the two since it’s already integrated with Snowflake. Snowpark enables you to execute arbitrary Spark code written in Scala, which will then be converted into an equivalent SQL query and be passed to Snowflake for computing to be performed. 

One of the great benefits of using this method to execute Spark code is that you will no longer have to move data in and out of a Spark cluster, saving you significantly on the bandwidth costs associated with moving data. If you would like to learn more about Snowpark, please check out our Spark Developer’s Guide to Snowpark.  

Conclusion

Hopefully, by now, you’re in a good position to install and configure the Snowflake Connector for Spark in your programming language of choice. For any additional questions on the Snowflake Connector, please take a look at the Snowflake Documentation Overview of the Spark Connector — Snowflake Documentation.

Looking to get more out of your Snowflake account?

From solution design to 24×7 data pipeline monitoring to software and automation tools, we’re here to streamline many of the complex processes required to launch Snowflake.

FAQ

Yes, to run Spark SQL queries using Snowflake compute, one would have to enable Pushdown processing in your Snowflake Connector session. What Pushdown processing does is it takes the Spark SQL commands and converts them into Snowflake SQL statements, improving the performance. Pushdown sessions can be enabled using functions in the Snowflake Connector Utils library. One advantage of using Pushdown queries is that the Spark application will need less infrastructure on the Spark side. This also helps control costs and better leverages the power of Snowflake.

Additionally, a developer may disable Pushdown query execution if they intend on using ML libraries that require the data to be pulled into the executors for calculating model outputs.

Example:

//disable Pushdown
SnowflakeConnectorUtils.disablePushdownSession(spark)

//enable Pushdown
SnowflakeConnectorUtils.enablePushdownSession(spark)

Yes, to run DDL/DML statements on Snowflake tables, one must use the runQuery() function in the Snowflake Connector Utils library and pass their Snowflake authentication parameters. These are the same parameters that are required for reading tables. The runQuery() function is intended for DDL/DML statements and not for SQL statements attempting to query a table such as a SELECT command.

Example:

var sfparams= Map(
   "sfURL" -> "<account_identifier>.snowflakecomputing.com",
   "sfUser" -> "<user_name>",
   "sfPassword" -> "<password>",
   "sfDatabase" -> "<database>",
   "sfSchema" -> "<schema>",
   "sfWarehouse" -> "<warehouse>"
   )

SnowflakeConnectorUtils.runQuery(sfparams, "CREATE TABLE MY_TABLE (COLUMN_A INTEGER)")

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit