In one of our earlier blogs, we discussed the Snowpipe streaming API provided by Snowflake. This is now referred to as Classic Architecture. The Snowpipe streaming API has been used by various organizations to stream real-time data into Snowflake programmatically. However, there were always some limitations, such as latency and SDK support.
In this blog, we will examine the recently released Snowpipe Streaming High Performance architecture. Before we dive in, let’s talk about the current Snowpipe features.
Snowpipe
Snowpipe StreamingÂ
Snowpipe Streaming through Kafka
Snowpipe:Â
The classic Snowpipe is a file-based ingestion method that achieves low latency by using micro-batches. Although this is not a real-time ingestion, organizations were still able to load the data into Snowflake as quickly as possible. This was a perfect fit for IoT use cases or any other use cases that continuously produce files.
Snowpipe Streaming:
Snowpipe Streaming is Snowflake’s first-generation near-real-time architecture, designed to achieve low-latency ingestion without requiring files to be dropped into a stage location. It uses a row-based ingestion method to push data into Snowflake tables. While this eliminates file-based ingestion, the responsibility falls on the producer to set up a client, open channels, and push the data. This also means that the producer will be responsible for handling errors, retries, and other similar issues.
Snowpipe Streaming Classic Using Java SDK
Snowpipe’s classic architecture has a few limitations
The SDK is currently limited to Java only, with no support for Python.
Achieving exactly once delivery is challenging without following best practices.
It uses serverless compute for data ingestion. Since various factors, such as the number of files, the number of rows, and data type, can affect compute resources, estimating compute costs can be very challenging.
There is an additional cost associated with temporary files stored in the stage location where records are buffered, and this storage cost may not be distinguished from regular storage costs.
Snowpipe Streaming Kafka:
The Snowflake connector added a new ingestion method, SNOWPIPE_STREAMING, to achieve low latency, which was much faster than the regular file-based Snowpipe design. This was built on classic architecture and handled features such as schema evolution, error handling, and retries.
Now that we have understood the Snowpipe methods available, let’s dive into the high-performance option.
What is Snowpipe Streaming High Performance Architecture?
The new Snowpipe Streaming High-Performance Architecture is designed for data-intensive applications, offering efficient and near-real-time ingestion. The high-performance architecture is an enhancement to the classic architecture, leveraging the well-known Snowpipe pipe object to ingest data into Snowflake, rather than the table object used in the classic Snowflake architecture.Â
The use of a pipe object allows data to be transformed before being ingested into the target table, and the pipe also performs schema validation on the server side, rather than the client side, as in the classic architecture.
Clients can track the progress of channel ingestion using offset tokens. These tokens enable exact-once delivery and support de-duplication within client applications. Snowpipe Streaming can also organize in-flight data by clustering it during the ingestion process. This clustering sorts incoming data before it is committed, resulting in better organization and faster query performance on target tables.
Key features of this high-performance architecture include:
Achieving ingestion speeds of up to 10 Gb/s per table
End-to-end ingestion latency as low as 5–10 seconds per table
The ability to transform ingested data, leveraging pipe objects within the architecture
What Are the General Use Cases of Snowpipe Streaming?
When considering streaming use cases, several key scenarios come to mind.
In the past, ingesting IoT or sensor data into Snowflake depended on classic ingestion architectures, typically utilizing the Java SDK or Kafka connectors. These traditional methods introduced buffering and commit delays, resulting in higher latency before the data became available for analysis.
The following are some use cases for the high-performance architecture.
Financial institutions can leverage this high-performance architecture to stream millions of financial transactions per second directly into Snowflake. This enables near-instant fraud detection, anomaly analysis, and risk modeling with current data instead of delayed batches.
Healthcare organizations can continuously stream patient monitoring and medical sensor data into Snowflake to support real-time health analytics, predictive monitoring, and operational efficiency improvements in clinical workflows.
Manufacturing and logistics companies can use the same architecture to process industrial IoT telemetry for equipment health monitoring, predictive maintenance, and supply chain optimization.
By combining high throughput, low latency, and built-in data organization, Snowpipe Streaming High Performance unlocks true real-time analytics at scale, bridging the gap between event generation and actionable insight.
We’ll explore a straightforward example of ingesting data into Snowflake using the High Performance SDK in Python, utilizing a locally configured Kafka instance as our streaming source. This example requires understanding of Kafka, Docker, and familiarity with running Python in an Anaconda environment.
Prerequisites for Snowpipe Streaming Setup:
Confluent Kafka Configuration: Docker must be installed locally to configure Confluent Kafka. Refer to the “Set up Streaming Data” section for instructions on installing Confluent Kafka in Docker and setting up the required topics.
Anaconda Python: Ensure you have Anaconda Python installed.
Snowflake Account Privileges: The Snowflake account must have the following:
A User ID configured with an RSA key.
Admin access to create and set up the necessary database, schema, and tables.
The Python code will operate in a local environment using Conda. It will establish a connection to Kafka, which is running inside a Docker container, and then write data to Snowflake.
Prerequisites for Snowpipe Streaming Setup
Confluent Kafka Configuration: Docker must be installed locally to configure Confluent Kafka. Refer to the “Set up Streaming Data” section for instructions on installing Confluent Kafka in Docker and setting up the required topics.
Anaconda Python: Ensure you have Anaconda Python installed.
Snowflake Account Privileges: The Snowflake account must have the following:
A User ID configured with an RSA key.
Admin access to create and set up the necessary database, schema, and tables.
The Python code will operate in a local environment using Conda. It will establish a connection to Kafka, which is running inside a Docker container, and then write data to Snowflake.
Set up Streaming Data
To set up a simple Kafka environment using Docker, follow the instructions in this blog: Schema Detection and Evolution in Snowflake for Streaming data. Complete Step 1 and Step 2 as outlined. Use the Confluent datagen connector as your source to continuously generate data with the following configuration.
{
"name": "DatagenConnectorConnector_user_data",
"config": {
"value.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter.schemas.enable": "false",
"name": "DatagenConnectorConnector_user_data",
"connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"kafka.topic": "user",
"max.interval": "500",
"schema.string": "{\"type\": \"record\",\"name\": \"userInfo\",\"namespace\": \"my.example\",\"fields\": [{\"name\": \"username\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"age\",\"type\": \"int\",\"default\": -1},{\"name\": \"phone\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"housenum\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"street\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"city\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"state_province\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"country\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"zip\",\"type\": \"string\",\"default\": \"NONE\"}]}",
"schema.keyfield": "username"
}
}
Set up Snowflake
In Snowflake, we need a table to load data. Although the datagen schema contains multiple columns, we will create a table with only 4 columns to demonstrate how to selectively load and transform data during streaming.
-- Table storing raw user-related events ingested from Kafka
CREATE OR REPLACE TABLE user_data
(
topic string COMMENT 'Name of the Kafka topic from which the event was consumed',
offset string COMMENT 'Kafka offset value used for ordering and replay tracking',
ingestion_timestamp string COMMENT 'Timestamp when the data was ingested into the system',
data variant COMMENT 'Full raw JSON payload as received from Kafka',
user_name string COMMENT 'User name extracted from the raw JSON data',
age string COMMENT 'User age extracted from the raw JSON data',
phone string COMMENT 'User phone number extracted from the raw JSON data',
city string COMMENT 'User city extracted from the raw JSON data'
);
CREATE OR REPLACE PIPE user_data_pipe
AS
COPY INTO user_data FROM
(
SELECT
$1:topic,
$1:offset,
$1:ingestion_timestamp,
$1:data,
$1:data."username"::string,
$1:data."age"::string,
$1:data."phone"::string,
$1:data."city"::string
FROM
TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
Snowpipe streaming will use this pipe object to ingest data from Kafka into the user_data table.
Set up Snowpipe Streaming SDK
We will use Conda to create a virtual environment. For this demo, you’ll need a virtual environment with the snowpipe-streaming library installed.
conda create --name snowpark-streaming python=3.11
conda activate snowpark-streaming
pip install snowpipe-streaming
Snowpipe Streaming SDK - Python
Before diving into the code for Snowpipe streaming, let’s review the key functions used:
streamingIngestClient: Initializes a streaming ingestion client object. This method requires the following parameters:client_name– A unique identifier to track the clientdb_nameschema_namepipe_nameprofile_jsoncontaining Snowflake authentication details
open_channel: Opens a channel on the client instance.append_row: Appends rows to the open channel.get_latest_committed_offset_token: Retrieves the latest committed offset token. Use this to monitor metadata and ensure ingestion proceeds without issues.close: Closes both the channel and the client.
Below is a sample code snippet demonstrating how to ingest data from Kafka into Snowflake tables.
{
"user": "SNOWFLAKE_USER",
"account": "SNOWFLAKE_ACCOUNT",
"authorization_type": "jwt",
"url": "https://SNOWFLAKE_ACCOUNT.snowflakecomputing.com:443",
"private_key": "",
"role" : "SNOWFLAKE_ROLE"
}
#!/usr/bin/env python3
"""
Simple Kafka to Snowpipe Streaming - Single File Version
Streamlined version combining Kafka consumer and Snowflake ingestion.
Update configuration and run!
Usage:
python3 kafka_snowpipe_simple.py
"""
from confluent_kafka import Consumer, KafkaError
import json
from datetime import datetime
import uuid
import os
import time
os.environ["SS_LOG_LEVEL"] = "warn"
from snowflake.ingest.streaming import StreamingIngestClient
# ============================================================================
# CONFIGURATION - UPDATE THESE
# ============================================================================
# Kafka Configuration
KAFKA_BROKER = 'localhost:9092'
KAFKA_TOPIC = 'user'
KAFKA_GROUP = 'snowpipe-consumer'
# Snowflake Configuration
SNOWFLAKE_DB = 'SANDBOX'
SNOWFLAKE_SCHEMA = 'DGANIGER'
SNOWFLAKE_TABLE = 'user_data'
SNOWFLAKE_PIPE = 'user_data_pipe'
PROFILE_JSON = 'profile.json'
# Processing Configuration
BATCH_SIZE = 50 # Number of messages per batch
IDLE_TIMEOUT_SECONDS = 5 # Exit if no messages for this many seconds
# ============================================================================
# GLOBAL STATE
# ============================================================================
class State:
"""Holds global state for processing."""
def __init__(self):
self.snowflake_client = None
self.snowflake_channel = None
self.kafka_consumer = None
self.batch = []
self.total_processed = 0
self.last_message_time = None
state = State()
# ============================================================================
# SNOWFLAKE SETUP
# ============================================================================
def setup_snowflake():
"""Initialize Snowflake Streaming client and channel."""
print("Connecting to Snowflake...")
# Create client
state.snowflake_client = StreamingIngestClient(
client_name=f"KAFKA_CLIENT_{uuid.uuid4()}",
db_name=SNOWFLAKE_DB,
schema_name=SNOWFLAKE_SCHEMA,
pipe_name=SNOWFLAKE_PIPE,
profile_json=PROFILE_JSON
)
# Open channel
state.snowflake_channel, status = state.snowflake_client.open_channel(
f"KAFKA_CHANNEL_{uuid.uuid4()}"
)
print(f" Snowflake connected: {state.snowflake_channel.channel_name}")
print(f" Database: {SNOWFLAKE_DB}")
print(f" Schema: {SNOWFLAKE_SCHEMA}")
print(f" Table: {SNOWFLAKE_TABLE}\n")
# ============================================================================
# KAFKA SETUP
# ============================================================================
def setup_kafka():
"""Initialize Kafka consumer."""
print(" Connecting to Kafka...")
config = {
'bootstrap.servers': KAFKA_BROKER,
'group.id': KAFKA_GROUP,
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'session.timeout.ms': 6000,
}
state.kafka_consumer = Consumer(config)
state.kafka_consumer.subscribe([KAFKA_TOPIC])
print(f" Kafka connected: {KAFKA_BROKER}")
print(f" Topic: {KAFKA_TOPIC}\n")
# ============================================================================
# MESSAGE PROCESSING
# ============================================================================
def transform_message(msg):
"""
Transform Kafka message to Snowflake row.
Creates a JSON with structure:
{
"topic": "topic_name",
"offset": 12345,
"ingestion_timestamp": "2025-10-21T14:30:22.123456",
"data": { ... original kafka message ... }
}
"""
try:
# Parse JSON from Kafka
kafka_data = json.loads(msg.value().decode('utf-8'))
except json.JSONDecodeError:
# Handle non-JSON messages - wrap as string
kafka_data = msg.value().decode('utf-8', errors='ignore')
# Create row with the requested structure
# Best Practice - Include metadata fields like topic, offset and ingestion timestamp to track
row = {
'topic': msg.topic(),
'offset': msg.offset(),
'ingestion_timestamp': datetime.now().isoformat(),
'data': kafka_data
}
return row
def process_message(msg):
"""Process each Kafka message."""
try:
# Update last message time
state.last_message_time = time.time()
# Transform message
row = transform_message(msg)
# Add to batch
state.batch.append(row)
state.total_processed += 1
print(f" Message {state.total_processed} buffered (batch: {len(state.batch)}/{BATCH_SIZE})")
# Flush when batch is full
if len(state.batch) >= BATCH_SIZE:
flush_to_snowflake()
except Exception as e:
print(f" Error processing message: {e}")
def check_idle_timeout():
"""Check if idle timeout has been reached."""
if state.last_message_time is None:
return False
idle_time = time.time() - state.last_message_time
if idle_time >= IDLE_TIMEOUT_SECONDS:
print(f"\n No messages received for {idle_time:.1f} seconds")
print(f" Idle timeout ({IDLE_TIMEOUT_SECONDS}s) reached - initiating graceful shutdown...")
return True
return False
def flush_to_snowflake():
"""Flush batch to Snowflake."""
if not state.batch:
return
print(f"\n Flushing {len(state.batch)} messages to Snowflake...")
try:
# Append rows to Snowflake channel
for i, row in enumerate(state.batch):
# Use topic and offset to create unique row ID
row_id = f"{row.get('topic', 'unknown')}_{row.get('offset', i)}"
state.snowflake_channel.append_row(row, row_id)
print(f" Batch ingested! Total processed: {state.total_processed}\n")
# Clear batch
state.batch = []
except Exception as e:
print(f" Error flushing to Snowflake: {e}")
raise
# ============================================================================
# CLEANUP
# Flush any remaining messages and close kafka connection
# ============================================================================
def cleanup():
"""Cleanup resources."""
print("\n Cleaning up...")
# Flush remaining messages
if state.batch:
print(f" Flushing remaining {len(state.batch)} messages...")
flush_to_snowflake()
# Close Kafka consumer
if state.kafka_consumer:
state.kafka_consumer.close()
print(" Kafka consumer closed")
# Close Snowflake resources
if state.snowflake_channel:
state.snowflake_channel.close()
print("Snowflake channel closed")
if state.snowflake_client:
state.snowflake_client.close()
print("Snowflake client closed")
print(f" Final Stats:")
print(f"Total messages processed: {state.total_processed}")
print(f"Messages successfully ingested: {state.total_processed - len(state.batch)}")
# ============================================================================
# MAIN
# ============================================================================
def main():
"""Main function."""
print("\n" + "="*60)
print("Kafka → Snowpipe Streaming - Simple Version")
print("="*60 + "\n")
try:
# Setup Snowflake and Kafka
setup_snowflake()
setup_kafka()
print("="*60)
print(f" Batch Size: {BATCH_SIZE} messages")
print(f" Idle Timeout: {IDLE_TIMEOUT_SECONDS} seconds")
print(" Press Ctrl+C to stop")
print("="*60 + "\n")
# Initialize last message time
state.last_message_time = time.time()
# Start consuming
print(" Listening for messages...\n")
while True:
# Poll for a single message with 1 second timeout
msg = state.kafka_consumer.poll(timeout=1.0)
if msg is None:
# No message received, check for idle timeout
if check_idle_timeout():
break
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(f" Kafka error: {msg.error()}")
continue
# Process the message
process_message(msg)
print(" Graceful shutdown initiated")
except KeyboardInterrupt:
print(" Stopped by user")
except Exception as e:
print(f"\n Error: {e}")
import traceback
traceback.print_exc()
finally:
cleanup()
if __name__ == '__main__':
main()
This sample demonstrated how to use the Python SDK to ingest data from a streaming source into a Snowflake table. Snowflake also supports data ingestion through the Java SDK and REST API. In this example, a pipe object was used to selectively choose columns and ingest data into tables. Two alternative approaches can be used.
Ingest incoming data as-is into a
VARIANTcolumn, and extract parsed fields with additional metadata such as client name, channel name, and offset.Use
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVEin the pipe command to automatically parse JSON fields and load them into Snowflake tables. Note that Snowflake does not support automatic schema evolution—if fields are missing from the table, Snowflake will not add them. In this scenario, the pipe command will be as follows.
CREATE OR REPLACE PIPE USER_DATA_PIPE
AS
COPY INTO USER_DATA TABLE(DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;
Best Practices
The following are some best practices to adopt when utilizing high-performance architecture.
- Enable multiple channels for data ingestion into Snowflake tables to increase throughput.
- Include metadata fields in the payload to capture the client name, channel name, and offset tokens for tracking the ingestion process.
-
Periodically check for gaps in offset tokens to verify data completeness. Snowflake logs any ingestion errors in the
SNOWPIPE_STREAMING_CHANNEL_HISTORYview; however, successful ingestions and related metadata are not currently tracked.
Closing
This blog explores how to leverage the new Snowpipe streaming feature for continuous data ingestion using Python, highlighting the differences between Classic and High-Performance architectures. It is essential for organizations to carefully assess their specific use cases to determine which architecture best suits their needs.
Which Snowpipe Streaming approach fits you best?
At phData, our experts are ready to evaluate your design and architecture and help develop the optimal solution. Let’s explore your Snowpipe Streaming use case together.




