Most data engineers will perform ETL in one form or the other before loading data into a data warehouse. With the recent trend in movement towards ELT, a lot of data warehousing tools now do the heavy lifting of compute in the overall ETL/ELT process.Â
With this trend, many of the latest data integration tools now support push down, which pushes all the compute from the ETL tool to the data warehouse tool itself.
In this blog, let’s look at how to integrate AWS Glue with the Snowflake Data Cloud to transform and load data into Snowflake. This post will also showcase how Glue pushes down the compute to Snowflake, thereby making use of Snowflake’s processing power in data loading and transformation.
Things to Consider Before Getting Started
For this blog, we will be using AWS CloudFormation to set up the overall infrastructure in the AWS environment. The data will be housed in an S3 bucket and will be processed by AWS Glue.Â
It should be noted that Glue will push most of the computing to Snowflake. The following diagram shows a high-level representation of data integration with AWS Glue and Snowflake.
S3 Bucket Configuration
Let’s assume that the AWS environment already exists and data exists in an S3 bucket. It is not included in the CloudFormation template. Usually, code and data reside in separate S3 buckets. But in this example, let’s assume that both code and data reside in the same bucket. Configure the S3 bucket structure like below.
Different folders in S3 serve different purposes as explained below.
- code – Holds Glue Spark Python Code and config file.
- connectors – Holds all supporting libraries and jar files
- data – Holds data files to be processed.
- temp – Glue temp folder
Glue and Snowflake integration Steps
- CloudFormation Script to setup the following:
- IAM role
- IAM policiesÂ
- AWS glue job
- Create S3 bucket and folder, Add Spark and JDBC jars to that folder (S3 bucket in the same region as AWS Glue). We will be using Glue 2.0 with Spark 2.4.
- Latest Snowflake Spark Connector
- Latest Snowflake JDBC Driver (Verify the JDBC supported version for the Spark Connector version you are using).
- Configure Snowflake credentials into AWS SecretsManager
Step 1: Setup Snowflake Environment
- Set up a Snowflake account.
- The first step will be to set up a Snowflake environment. For this setup, you will need a Snowflake account with a user and virtual warehouse configured.
- Create Snowflake Database and Schema.
- Create a database in Snowflake.
- Create a schema in Snowflake where the target table will reside.
Step 2: Create AWS Services using CloudFormation Script
AWS CloudFormation is a solution that allows you to set up AWS services using a template programmatically. For this blog, we will use the AWS CloudFormation template to set up a Glue Job. We are assuming that the S3 bucket already exists.
The CloudFormation script will set up the following:
- Create a new IAM role if one doesn’t already exist.Â
- Add S3 bucket resource in IAM Role Policies.
- Add SecretsManager resource within IAM Role Policies (see detailed steps below for configuring Snowflake credentials in Secrets Manager).
- Add Glue resources within IAM Role Policies.
- Add PySpark code (JDBC & Spark connectors locations) within Glue Job.
AWSTemplateFormatVersion: "2010-09-09"
Description: >
This Template Configures a Job to Load Data into a Snowflake Table using Glue
Parameters:
ProjectBucket:
Type: String
MinLength: "1"
Description: "S3 Bucket Containing the Data Lake Data"
CodeBucket:
Type: String
Description: "S3 bucket containing glue code"
SecretName:
Type: String
Description: "The secret containing the Snowflake login information"
Resources:
SnowflakeGlueJobRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- glue.amazonaws.com
Action:
- sts:AssumeRole
RoleName: 'glue-snowflake-iam-role'
Policies:
- PolicyName: glue-snowflake-s3-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- "s3:GetObject"
- "s3:PutObject"
- "s3:ListBucket"
- "s3:DeleteObject"
Resource:
- !Sub "arn:aws:s3:::${ProjectBucket}"
- !Sub "arn:aws:s3:::${ProjectBucket}/*"
- PolicyName: glue-snowflake-cloudwatch-policy
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource:
- "arn:aws:logs:*:*:/aws-glue/*"
- PolicyName: "glue-snowflake-secret-manager"
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: "Allow"
Action: [
"secretsmanager:GetSecretValue",
"secretsmanager:DescribeSecret"
]
Resource: [
!Sub "arn:aws:secretsmanager:${AWS::Region}:${AWS::AccountId}:secret:${SecretName}*"
]
Path: "/"
SnowflakeGlueJob:
Type: 'AWS::Glue::Job'
Properties:
Name: 'glue-to-snowflake-copy'
Description: ''
Role: !Ref SnowflakeGlueJobRole
Command:
Name: glueetl
PythonVersion: '3'
ScriptLocation: !Sub 's3://${CodeBucket}/code/glue_copy_snowflake.py'
GlueVersion: '2.0'
WorkerType: G.1X
NumberOfWorkers: 2
Timeout: 480
DefaultArguments:
'--extra-jars': !Sub 's3://${CodeBucket}/connectors/snowflake-jdbc-3.13.22.jar,s3://${CodeBucket}/connectors/spark-snowflake_2.11-2.9.3-spark_2.4.jar'
'--extra-files': !Sub 's3://${CodeBucket}/code/config.yaml'
'--TempDir' : !Sub 's3://${CodeBucket}/temp'
'--spark-event-logs-path' : !Sub 's3://${CodeBucket}/sparkHistoryLogs/'
'--enable-metrics': true
'--enable-spark-ui' : true
Integrating Snowflake Credentials into Secret Manager
Secrets Manager is an AWS service that provides the feature of storing passwords and automated password rotations, which are key components of secure storage. To begin:
- Create a secret for Snowflake by selecting the ‘Other type of Secret’ option.
- Fill in all the required credential information and save the secret.
- Record this Secret Manager information for CloudFormation deployment.
In this example, we will store Snowflake User ID and password in Secret Manager. If Key Pair authentication is used, the Private Key passphrase can be stored in Secret Manager.
Step 3: Deploy CloudFormation Template
AWS CloudFormation template can be deployed in multiple ways. In this example, we will manually deploy the template by going to the CloudFormation screen. Upload the template file and click the Next button.
Give a name to the stack and fill in the required information like code bucket, data bucket, and Secret Manager name.
Click the Next button and then click on the Create Stack button. The services listed in the template will be deployed one by one. Finally, it will show a status as CREATE_COMPLETE for the entire stack (shown below).
All these steps can be automated either using AWS CLI or even through a CICD pipeline using services like GitHub Actions, Jenkins, etc.
Step 4: Create Glue PySpark Code
Create a PySpark code and config file and upload it in the S3 bucket (code folder) mentioned in the AWS Glue Job in CloudFormation. The config file will be used to store information like Snowflake account, DB, Schema, etc. This information can also be stored in the Secret Manager if needed.
At a high level, the code will perform the following steps:
- Read Snowflake and AWS configuration within the PySpark code.
- Read Snowflake credentials from AWS Secrets Manager.
- Read from S3 CSV data, convert to dataframe , apply transformations (if any) and load into target Snowflake table.
The config file will look like this below:
glue_to_snowflake:
url:
role:
warehouse:
db:
schema:
input_s3_bucket:
PySpark code to read from S3 and load into Snowflake.
import sys
import boto3
import json
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from py4j.java_gateway import java_import
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
java_import(spark._jvm, SNOWFLAKE_SOURCE_NAME)
spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())
import yaml
# Getting DB credentials from Secrets Manager
client = boto3.client("secretsmanager", region_name="us-east-1")
get_secret_value_response = client.get_secret_value(
SecretId="snowflake_credentials"
)
secret = get_secret_value_response['SecretString']
secret = json.loads(secret)
sf_username = secret.get('SNOWFLAKE_USER')
sf_password = secret.get('SNOWFLAKE_PWD')
with open("config.yaml", "r") as stream:
try:
c=yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
sfOptions = {
"sfURL" : c['glue_to_snowflake']['url'],
"sfRole" : c['glue_to_snowflake']['role'],
"sfUser" : sf_username,
"sfPassword" : sf_password,
"sfDatabase" : c['glue_to_snowflake']['db'],
"sfSchema" : c['glue_to_snowflake']['schema'],
"sfWarehouse" : c['glue_to_snowflake']['warehouse'],
}
# Employee data base schema
schema = StructType(
[
StructField("Employee_id", IntegerType(), nullable=True),
StructField("First_name", StringType(), nullable=True),
StructField("Last_name", StringType(), nullable=True),
StructField("Email", StringType(), nullable=True),
StructField("Salary", IntegerType(), nullable=True),
StructField("Department", StringType(), nullable=True)
]
)
# Read from s3 location into a Spark Dataframe
input_bucket = c['glue_to_snowflake']['input_s3_bucket']
s3_uri = f"s3://{input_bucket}/data/employee.csv"
# Read from S3 and load into a Spark Dataframe
df = spark.read.option("header", True).csv(path=s3_uri,sep=',',schema=schema)
# Perform any kind of transformations on your data
df1 = df.filter( (df.Department == "IT") | (df.Department == "Mkt") )
# Write the Data Frame contents back to Snowflake in a new table - stage_employee
df1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "stage_employee").mode("overwrite").save()
job.commit()
Step 5: Execute Glue Job
Glue Jobs can be run through the AWS Glue console ( job run button of the Glue Job) as part of Testing. For automation or in the production environment, we could use AWS Managed Airflow using cron scheduler facility, AWS step functions or any other relevant services. This is not covered as part of this blog. Glue Job runs can be monitored through AWS Cloud Watch.
Step 6: Verify Data in Snowflake
Next, let’s login into Snowflake to see if the data is loaded successfully. Before checking the data, click on the History tab in Snowflake. In this example, Snowflake shows multiple steps being pushed down from Glue into Snowflake.
For example, the following statement in PySpark is converted into SQL in Snowflake:
df.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "glue_snowflake2").mode("overwrite").save()
Snowflake connector takes the data in S3 and does the following:
- Create a temporary internal stage and load data in the dataframe into the internal stage.
- Checks if the target table exists and creates the table if it doesn’t exist.
- Load the data from the internal stage into the table using copy command.
- Drop the temporary internal stage.
All these will be evident from Snowflake query history as shown below.
For data validation, open a worksheet and query the data in the target table.
Note
AWS Glue 3.0 requires Spark 3.1.1 – Snowflake Spark Connector 2.10.0-spark_3.1 or higher, and Snowflake JDBC Driver 3.13.14 can be used.
Coming Soon
As part two of this Glue-Snowflake integration, I will be covering the automation of this whole process(CICD) using GHA (GitHub Actions).
Closing
While this blog covers the ELT process at a very high level, there are more steps to be done in a real production environment. The Glue Jobs can be orchestrated using schedulers like Airflow/Step functions and the whole deployment process can be automated using Jenkins/GitHub Actions.Â
Our data engineers are highly skilled in ETL/ELT processes in any cloud environment. If you need any help in data engineering, feel free to reach out to the experts at phData!