How Do I Access Snowflake Using AWS Glue?

Most data engineers will perform ETL in one form or the other before loading data into a data warehouse. With the recent trend in movement towards ELT, a lot of data warehousing tools now do the heavy lifting of compute in the overall ETL/ELT process. 

With this trend, many of the latest data integration tools now support push down, which pushes all  the compute from the ETL tool to the data warehouse tool itself.

In this blog, let’s look at how to integrate AWS Glue with the Snowflake Data Cloud to transform and load data into Snowflake. This post will also showcase how Glue pushes down the compute to Snowflake, thereby making use of Snowflake’s processing power in data loading and transformation.

Things to Consider Before Getting Started

For this blog, we will be using AWS CloudFormation to set up the overall infrastructure in the AWS environment. The data will be housed in an S3 bucket and will be processed by AWS Glue. 

It should be noted that Glue will push most of the computing to Snowflake. The following diagram shows a high-level representation of data integration with AWS Glue and Snowflake.

A diagram titled, "Data Integration Using Glue and Snowflake that contains a number of icons and pathways.

S3 Bucket Configuration

Let’s assume that the AWS environment already exists and data exists in an S3 bucket. It is not included in the CloudFormation template. Usually, code and data reside in separate S3 buckets. But in this example, let’s assume that both code and data reside in the same bucket. Configure the S3 bucket structure like below.

Different folders in S3 serve different purposes as explained below.

  1. code – Holds Glue Spark Python Code and config file.
  2. connectors – Holds all supporting libraries and jar files
  3. data – Holds data files to be processed.
  4. temp – Glue temp folder

Glue and Snowflake integration Steps

  • CloudFormation Script to setup the following:
    • IAM role
    • IAM policies 
    • AWS glue job
  • Create S3 bucket and folder, Add Spark and JDBC jars to that folder (S3 bucket in the same region as AWS Glue). We will be using Glue 2.0 with Spark 2.4.
  • Configure Snowflake credentials into AWS SecretsManager

Step 1: Setup Snowflake Environment

  • Set up a Snowflake account.
  • Create Snowflake Database and Schema.
    • Create a database in Snowflake.
    • Create a schema in Snowflake where the target table will reside.

Step 2: Create AWS Services using CloudFormation Script

AWS CloudFormation is a solution that allows you to set up AWS services using a template programmatically. For this blog, we will use the AWS CloudFormation template to set up a Glue Job. We are assuming that the S3 bucket already exists.

The CloudFormation script will set up the following:

  • Create a new IAM role if one doesn’t already exist. 
  • Add S3 bucket resource in IAM Role Policies.
  • Add SecretsManager resource within IAM Role Policies (see detailed steps below for configuring Snowflake credentials in Secrets Manager).
  • Add Glue resources within IAM Role Policies.
  • Add PySpark code (JDBC & Spark connectors locations) within Glue Job.
				
					AWSTemplateFormatVersion: "2010-09-09"
Description: >
This Template Configures a Job to Load Data into a Snowflake Table using Glue
Parameters:
ProjectBucket:
  Type: String
  MinLength: "1"
  Description: "S3 Bucket Containing the Data Lake Data"
CodeBucket:
  Type: String
  Description: "S3 bucket containing glue code"
SecretName:
  Type: String
  Description: "The secret containing the Snowflake login information"
Resources:
SnowflakeGlueJobRole:
  Type: "AWS::IAM::Role"
  Properties:
    AssumeRolePolicyDocument:
      Version: '2012-10-17'
      Statement:
        - Effect: Allow
          Principal:
            Service:
              - glue.amazonaws.com
          Action:
            - sts:AssumeRole
    RoleName: 'glue-snowflake-iam-role'
    Policies:
      - PolicyName: glue-snowflake-s3-policy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
                - "s3:GetObject"
                - "s3:PutObject"
                - "s3:ListBucket"
                - "s3:DeleteObject"
              Resource:
                - !Sub "arn:aws:s3:::${ProjectBucket}"
                - !Sub "arn:aws:s3:::${ProjectBucket}/*"
      - PolicyName: glue-snowflake-cloudwatch-policy
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
               - "logs:CreateLogGroup"
               - "logs:CreateLogStream"
               - "logs:PutLogEvents"
              Resource:
               - "arn:aws:logs:*:*:/aws-glue/*"
      - PolicyName: "glue-snowflake-secret-manager"
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
            - Effect: "Allow"
              Action: [
                  "secretsmanager:GetSecretValue",
                  "secretsmanager:DescribeSecret"
              ]
              Resource: [
                  !Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"
              ]
    Path: "/"
SnowflakeGlueJob:
  Type: 'AWS::Glue::Job'
  Properties:
    Name: 'glue-to-snowflake-copy'
    Description: ''
    Role: !Ref SnowflakeGlueJobRole
    Command:
      Name: glueetl
      PythonVersion: '3'
      ScriptLocation: !Sub 's3://${CodeBucket}/code/glue_copy_snowflake.py'
    GlueVersion: '2.0'
    WorkerType: G.1X
    NumberOfWorkers: 2
    Timeout: 480
    DefaultArguments:
     '--extra-jars': !Sub 's3://${CodeBucket}/connectors/snowflake-jdbc-3.13.22.jar,s3://${CodeBucket}/connectors/spark-snowflake_2.11-2.9.3-spark_2.4.jar'
     '--extra-files': !Sub 's3://${CodeBucket}/code/config.yaml'
     '--TempDir' : !Sub 's3://${CodeBucket}/temp'
     '--spark-event-logs-path' : !Sub 's3://${CodeBucket}/sparkHistoryLogs/'
     '--enable-metrics': true
     '--enable-spark-ui' : true
				
			

Integrating Snowflake Credentials into Secret Manager

Secrets Manager is an AWS service that provides the feature of storing passwords and automated password rotations, which are key components of secure storage. To begin:

  • Create a secret for Snowflake by selecting the ‘Other type of Secret’ option.
  • Fill in all the required credential information and save the secret.
  • Record this Secret Manager information for CloudFormation deployment.

In this example, we will store Snowflake User ID and password in Secret Manager. If Key Pair authentication is used, the Private Key passphrase can be stored in Secret Manager.

A screenshot from the Secret Manager UI

Step 3: Deploy CloudFormation Template

AWS CloudFormation template can be deployed in multiple ways. In this example, we will manually deploy the template by going to the CloudFormation screen. Upload the template file and click the Next button.

A screenshot from Secrets Manager that is titled, "Create Stack"

Give a name to the stack and fill in the required information like code bucket, data bucket, and Secret Manager name.

Another screenshot from Secret Manager. It's titled, "Stack Name"

Click the Next button and then click on the Create Stack button. The services listed in the template will be deployed one by one. Finally, it will show a status as CREATE_COMPLETE for the entire stack (shown below).

A screenshot from the Secret Manager UI that shows a number of statuses in green and some in blue.

All these steps can be automated either using AWS CLI or even through a CICD pipeline using services like GitHub Actions, Jenkins, etc.

Step 4: Create Glue PySpark Code

Create a PySpark code and config file and upload it in the S3 bucket (code folder) mentioned in the AWS Glue Job in CloudFormation. The config file will be used to store information like Snowflake account, DB, Schema, etc. This information can also be stored in the Secret Manager if needed.

At a high level, the code will perform the following steps:

  • Read Snowflake and AWS configuration within the PySpark code.
  • Read Snowflake credentials from AWS Secrets Manager.
  • Read from S3 CSV data, convert to dataframe , apply transformations (if any) and load into target Snowflake table.

The config file will look like this below:

				
					glue_to_snowflake:
   url: <Snowflake account>
   role: <Snowflake role>
   warehouse: <Snowflake warehouse name>
   db: <Snowflake DB name>
   schema: <Snowflake Schema name>
   input_s3_bucket: <Input S3 bucket>
				
			

PySpark code to read from S3 and load into Snowflake.

				
					import sys
import boto3
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
java_import(spark._jvm, SNOWFLAKE_SOURCE_NAME)
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
import yaml
 
# Getting DB credentials from Secrets Manager
client = boto3.client("secretsmanager", region_name="us-east-1")
 
get_secret_value_response = client.get_secret_value(
       SecretId="snowflake_credentials"
)
 
secret = get_secret_value_response['SecretString']
secret = json.loads(secret)
 
sf_username = secret.get('SNOWFLAKE_USER')
sf_password = secret.get('SNOWFLAKE_PWD')
 
with open("config.yaml", "r") as stream:
  try:
      c=yaml.safe_load(stream)
  except yaml.YAMLError as exc:
      print(exc)
 
 
sfOptions = {
"sfURL" : c['glue_to_snowflake']['url'],
"sfRole" : c['glue_to_snowflake']['role'],
"sfUser" : sf_username,
"sfPassword" : sf_password,
"sfDatabase" : c['glue_to_snowflake']['db'],
"sfSchema" : c['glue_to_snowflake']['schema'],
"sfWarehouse" : c['glue_to_snowflake']['warehouse'],
}
 
 
# Employee data base schema
schema = StructType(
      [
          StructField("Employee_id", IntegerType(), nullable=True),
          StructField("First_name", StringType(), nullable=True),
          StructField("Last_name", StringType(), nullable=True),
          StructField("Email", StringType(), nullable=True),
          StructField("Salary", IntegerType(), nullable=True),
          StructField("Department", StringType(), nullable=True)
      ]
  )
 
# Read from s3 location into a Spark Dataframe
input_bucket = c['glue_to_snowflake']['input_s3_bucket']
s3_uri = f"s3://{input_bucket}/data/employee.csv"
 
# Read from S3 and load into a Spark Dataframe
df = spark.read.option("header", True).csv(path=s3_uri,sep=',',schema=schema)
 
# Perform any kind of transformations on your data
df1 = df.filter( (df.Department  == "IT") | (df.Department  == "Mkt") )
 
# Write the Data Frame contents back to Snowflake in a new table - stage_employee
df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "stage_employee").mode("overwrite").save()
 
job.commit()

				
			

Step 5: Execute Glue Job

Glue Jobs can be run through the AWS Glue console ( job run button of the Glue Job) as part of Testing. For automation or in the production environment, we could use AWS Managed Airflow using cron scheduler facility, AWS step functions or any other relevant services. This is not covered as part of this blog. Glue Job runs can be monitored through AWS Cloud Watch.

A pop-up window from AWS

Step 6: Verify Data in Snowflake

Next, let’s login into Snowflake to see if the data is loaded successfully. Before checking the data, click on the History tab in Snowflake. In this example, Snowflake shows multiple steps being pushed down from Glue into Snowflake.

For example, the following statement in PySpark is converted into SQL in Snowflake:

				
					df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "glue_snowflake2").mode("overwrite").save()
				
			

Snowflake connector takes the data in S3 and does the following:

  1. Create a temporary internal stage and load data in the dataframe into the internal stage.
  2. Checks if the target table exists and creates the table if it doesn’t exist.
  3. Load the data from the internal stage into the table using copy command.
  4. Drop the temporary internal stage.

All these will be evident from Snowflake query history as shown below.

A screenshot of the Snowflake Query History

For data validation, open a worksheet and query the data in the target table.

Note

AWS Glue 3.0 requires Spark 3.1.1 – Snowflake Spark Connector 2.10.0-spark_3.1 or higher, and Snowflake JDBC Driver 3.13.14 can be used.

Coming Soon

As part two of this Glue-Snowflake integration, I will be covering the automation of this whole process(CICD) using GHA (GitHub Actions).

Closing

While this blog covers the ELT process at a very high level, there are more steps to be done in a real production environment. The Glue Jobs can be orchestrated using schedulers like Airflow/Step functions and the whole deployment process can be automated using Jenkins/GitHub Actions. 

Our data engineers are highly skilled in ETL/ELT processes in any cloud environment. If you need any help in data engineering, feel free to reach out to the experts at phData!

More to explore

Accelerate and automate your data projects with the phData Toolkit

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.