Looking for the best way to approach data ingestion within the Snowflake Data Cloud? Are you struggling to keep up with the breadth and speed of data being generated?
Defining best practices will depend on where you’re loading data from and how you want to load it into Snowflake.
Most enterprises have a wide variety of sources for data. It’s common to have a mix of on-premises systems, cloud systems, and vendor connectivity. Your enterprise may also take advantage of Internet of Things (IoT) devices which are constantly publishing data back to your system.
All this data drives business value — at least in theory. But the number of sources and volume of data can present a problem. In order for your data to provide business value, it needs to be available and curated. The faster that you can get your data into the hands of your analysts, customers, or executives, the larger impact it has on your enterprise — either with the competitive advantage found in more accurate data models or the ability to make better-informed decisions, faster.
There are three main areas of data performance that directly affect the availability to your end-users:
- Data Loading
- Data Transformation
- Data Modeling
Within this blog post, we will be focusing on data loading. We will cover the rest later on in this series. For this guide, we’ll be focusing on four elements of the conversation:
- How Do I Optimize Data Loading?
- General Ingestion Recommendations
- Which Method Should I Use?
- Common Data Ingestion Questions
How Do I Optimize Data Loading?
When loading data via a streaming data provider, such as Apache Kafka, the main tool that you will be using from Snowflake is Snowpipe. Snowpipe is a tool provided by Snowflake that allows for processing data as soon as it’s available within a stage.
Generally, this will be an external stage such as AWS Simple Storage Service (S3) or Azure Blob Storage. If you’re ingesting events or change data capture (CDC) based data, a streaming-based approach will be your best option for data ingestion. This allows for smaller micro-batch processing of the data, and easily facilitates distributed computing.
How Does Snowpipe Load My Data?
The specifics of how to setup Snowpipe will vary depending on what you’re using for an external stage. For this blog, we will focus on AWS S3. In order for Snowpipe to work, you will need to setup the following:
- S3 bucket
- IAM policy for Snowflake generated IAM user
- S3 bucket policy for IAM policy
- S3 event notifications (SQS/SNS)
Once all of the above has been set up, the workflow looks like this:
- File is placed into S3
- Snowpipe watches SQS for messages
- When Snowpipe sees a new message, the file to be processed is put into an internal queue
- Snowpipe will consolidate or split files up to optimize performance automatically
- Copy statement from Snowpipe ran data from files
- Snowpipe commits the data and removes the message from the internal queue
Snowpipe won’t necessarily process messages from its internal queue in the order they were placed into the external stage. Instead, Snowpipe will attempt to run multiple processes internally to distribute the data and ingestion workload. The number of data files that are processed in parallel is determined by the number and capacity of servers in a warehouse.
This means you should optimize your data for loading into Snowflake with Snowpipe in its external storage environment.
How Do I Optimize Snowpipe?
In order to minimize the amount of data shuffling that Snowpipe performs to internally optimize its processing, there are a few general guidelines you should follow when placing data into your external stage.
(For a general set of recommendations across platforms, you can reference the General Ingestion Recommendations below)
For Snowpipe specifically, there’s a tradeoff for placing files into your external stage too frequently. Since Snowpipe maintains an internal queue, there’s overhead involved with the workload management of pushing and pulling from the queue. To minimize this overhead cost, Snowflake recommends adding a file once per minute.
If you’re using AWS, Kinesis Firehose is a convenient tool for automating this process, as it allows defining both the desired file size and the wait interval — after which a new file is sent to AWS S3.
What About Kafka?
Snowflake provides an option for consuming streaming data directly from Apache Kafka (instead of splitting your data out into an external stage).
You will need to set up the Snowflake Connector for Kafka, which requires access to the Kafka cluster. Similar to Kinesis Firehose, the connector allows you to specify the desired file size and wait interval before sending data to Snowflake for processing.
Under the hood, Snowflake creates a Snowpipe for each partition in your Kafka topic and executes a copy command into the configured table. This table can either be created ahead of time, or Snowflake will automatically create the table using the topic name and a predefined schema.
If you’re finding that you need to scale ingestion performance with Kafka, you should start by evaluating how many partitions are within the Kafka topic you’re consuming from. Snowpipe creates an ingestion pipe from the internal stage for each partition in the topic. This means having too few partitions could build up back pressure, continually trying to catch up with the amount of data streaming through the pipe. However, having too many partitions can cause issues with replication latency internal to Kafka and resource contention in Snowflake.
A bulk load for Snowflake data ingestion is for when you have a process for exporting data on a scheduled interval from either an on-premises system, vendor, or cloud provider.
A common example of this is running a set of queries on an on-premises or cloud database, extracting data based on a time window, and then exporting that data to your data warehouse.
In order to load this data into Snowflake, you will need to set up the appropriate permissions and Snowflake resources. Continuing with our example of AWS S3 as an external stage, you will need to configure the following:
- S3 bucket
- IAM policy for Snowflake generated IAM user
- S3 bucket policy for IAM policy
Once you have all this configured, you will be ready to start ingesting data. Unlike Snowpipe, which is a serverless process, bulk loading will require an active warehouse for processing. By default, you will need to manually run this copy command for each bulk load that you want to run.
How Do I Automate Bulk Loading?
If you want to schedule your bulk loading process to run on a schedule, you will need to configure a Snowflake Task. This allows you to specify a schedule and warehouse for your copy command to use. This uses a cron expression.
In order to use these tasks, you will need to ensure you configure the following:
- USAGE permissions on the warehouse in the task definition
- After creating the task, you must execute an alter task to resume the task
- AutoCommit is set to true
From a security perspective, tasks run using the role that has the ownership privilege on the task. You will need to ensure that the role has the correct granted permissions to be able to read the required data and write the data into the destination.
Warehouse Sizing for Bulk Loads
Once your file has been split into the recommended size, Snowflake will be able to distribute these files to the warehouse/cluster you’ve specified. However, a larger warehouse isn’t always guaranteed to process your data faster, so it’s recommended to perform some analysis in your specific environment.
Data load performance will greatly vary depending on what compression format your data is stored in, your file size, your file format, and your warehouse size. The most performant and cost-effective option will vary depending on these factors.
As an example, the following shows load times based on a sample dataset provided by Snowflake. This used a 10TB dataset, containing multiple tables and a 2X-Large warehouse. The comparison shows different file formats, compression algorithms, and data types that were used to load the source data.
Note: this same dataset was also loaded with a 3X-Large and a X-Large warehouse. As you would expect, the load time for the 3X-Large was faster than the 2X-Large. The credit usage was roughly the same due to the 2X-Large costing less per credit, but executing for longer. In this case, this means that you will spend the same number of Snowflake credits to load a given dataset regardless of the cluster size you use.
General Ingestion Recommendations
- Keep data files 100-250 MB in size compressed
- This may require aggregating smaller files together or splitting larger files apart before placing them into the external stage
- If ingesting JSON into a Variant Data Type field, set your copy command to split the outer array
- If ingesting Parquet, split large files into files 1 GB in size (or smaller) for loading
- Pre-sort your data based on your clustering keys when possible
Streaming Specific Recommendations
- Add one file per minute to your external stage
- Configure Kinesis Firehose or Kafka Connect to batch data into recommended file size and buffer interval
- If using Kafka, Snowflake will create a pipe per Kafka topic partition. If you need to scale ingest performance, you could add additional partitions to your topic.
Bulk Specific Recommendations
- Pre-split large files into recommended file size
- Ensure the file type and compression you’re using supports pre-splitting.
Which Method Should I Use for Snowflake Data Ingestion?
One of the key aspects of data availability is how quickly you can ingest your data.
In many cases, you will need to use a mixture of streaming and batch processing. Many systems don’t have a mechanism for capturing event or CDC type data to facilitate streaming architectures, relying instead on batch/bulk data extraction.
In general, our recommendation is to use a streaming architecture whenever possible to allow for lower processing times. We see it as a more optimal option due to the event-based model that Snowpipe facilitates, as well as applying data transformations to a smaller set of data.
If you have to perform a bulk data load, there are steps you can take to increase the load performance. Namely, you should pre-split your files ahead of time to the recommended file sizes, tailor your warehouse size to the amount of data you’re trying to load, and make sure your data format allows for splitting of your data.
Common Data Ingestion Questions
From a cost perspective, assuming you have the ability to install the Snowflake Connector on the Kafka Connect cluster, it’s going to be cheaper and faster to directly write from Kafka to Snowflake.
When using Kafka and Kafka Connect, you will have the following costs:
- Kafka (if you own the cluster)
- Kafka Connect Cluster
- This is a separate cluster that reads from a Kafka cluster
- Internal Snowflake Stage
- Snowpipe per topic partition
When using Kinesis Firehose, you’re going to need one of two configurations which have different costs:
- Write to S3
- S3 costs
- SNS and/or SQS costs
- Snowpipe costs
- Kinesis Client Library
- Compute resource to consume from Kinesis Firehose
- Snowflake Warehouse
Snowflake keeps track of which files have been loaded via Snowpipe with an internal metadata table.
Once files have been processed, they are added to the metadata table. If a file that has already been processed is updated, it will not be reprocessed. By default, this metadata is stored for 14 days. If you need to reload modified data files, you will need to recreate the pipe object.