December 17, 2025

Snowflake Snowpipe Streaming High-performance Architecture

By Deepa Ganiger

In one of our earlier blogs, we discussed the Snowpipe streaming API provided by Snowflake. This is now referred to as Classic Architecture. The Snowpipe streaming API has been used by various organizations to stream real-time data into Snowflake programmatically. However, there were always some limitations, such as latency and SDK support.

In this blog, we will examine the recently released Snowpipe Streaming High Performance architecture. Before we dive in, let’s talk about the current Snowpipe features.

  • Snowpipe

  • Snowpipe Streaming 

  • Snowpipe Streaming through Kafka

Snowpipe: 

The classic Snowpipe is a file-based ingestion method that achieves low latency by using micro-batches. Although this is not a real-time ingestion, organizations were still able to load the data into Snowflake as quickly as possible. This was a perfect fit for IoT use cases or any other use cases that continuously produce files.

Snowpipe Streaming:

Snowpipe Streaming is Snowflake’s first-generation near-real-time architecture, designed to achieve low-latency ingestion without requiring files to be dropped into a stage location. It uses a row-based ingestion method to push data into Snowflake tables. While this eliminates file-based ingestion, the responsibility falls on the producer to set up a client, open channels, and push the data. This also means that the producer will be responsible for handling errors, retries, and other similar issues.

Snowpipe Streaming Classic Using Java SDK

Snowpipe’s classic architecture has a few limitations

  • The SDK is currently limited to Java only, with no support for Python.

  • Achieving exactly once delivery is challenging without following best practices.

  • It uses serverless compute for data ingestion. Since various factors, such as the number of files, the number of rows, and data type, can affect compute resources, estimating compute costs can be very challenging.

  • There is an additional cost associated with temporary files stored in the stage location where records are buffered, and this storage cost may not be distinguished from regular storage costs.

Snowpipe Streaming Kafka:

The Snowflake connector added a new ingestion method, SNOWPIPE_STREAMING, to achieve low latency, which was much faster than the regular file-based Snowpipe design. This was built on classic architecture and handled features such as schema evolution, error handling, and retries.

Now that we have understood the Snowpipe methods available, let’s dive into the high-performance option.

What is Snowpipe Streaming High Performance Architecture?

The new Snowpipe Streaming High-Performance Architecture is designed for data-intensive applications, offering efficient and near-real-time ingestion. The high-performance architecture is an enhancement to the classic architecture, leveraging the well-known Snowpipe pipe object to ingest data into Snowflake, rather than the table object used in the classic Snowflake architecture. 

The use of a pipe object allows data to be transformed before being ingested into the target table, and the pipe also performs schema validation on the server side, rather than the client side, as in the classic architecture.

Comparison between Snowflake Streaming Classic (Java client channels inserting rows directly into a table) vs High Performance (Python/Java client channels appending rows through a Pipe into the table).

Clients can track the progress of channel ingestion using offset tokens. These tokens enable exact-once delivery and support de-duplication within client applications. Snowpipe Streaming can also organize in-flight data by clustering it during the ingestion process. This clustering sorts incoming data before it is committed, resulting in better organization and faster query performance on target tables.

Key features of this high-performance architecture include:

  • Achieving ingestion speeds of up to 10 Gb/s per table

  • End-to-end ingestion latency as low as 5–10 seconds per table

  • The ability to transform ingested data, leveraging pipe objects within the architecture

What Are the General Use Cases of Snowpipe Streaming?

When considering streaming use cases, several key scenarios come to mind.

In the past, ingesting IoT or sensor data into Snowflake depended on classic ingestion architectures, typically utilizing the Java SDK or Kafka connectors. These traditional methods introduced buffering and commit delays, resulting in higher latency before the data became available for analysis.

The following are some use cases for the high-performance architecture.

  • Financial institutions can leverage this high-performance architecture to stream millions of financial transactions per second directly into Snowflake. This enables near-instant fraud detection, anomaly analysis, and risk modeling with current data instead of delayed batches.

  • Healthcare organizations can continuously stream patient monitoring and medical sensor data into Snowflake to support real-time health analytics, predictive monitoring, and operational efficiency improvements in clinical workflows.

  • Manufacturing and logistics companies can use the same architecture to process industrial IoT telemetry for equipment health monitoring, predictive maintenance, and supply chain optimization.

By combining high throughput, low latency, and built-in data organization, Snowpipe Streaming High Performance unlocks true real-time analytics at scale, bridging the gap between event generation and actionable insight.

We’ll explore a straightforward example of ingesting data into Snowflake using the High Performance SDK in Python, utilizing a locally configured Kafka instance as our streaming source. This example requires understanding of Kafka, Docker, and familiarity with running Python in an Anaconda environment.

Prerequisites for Snowpipe Streaming Setup:

  • Confluent Kafka Configuration: Docker must be installed locally to configure Confluent Kafka. Refer to the “Set up Streaming Data” section for instructions on installing Confluent Kafka in Docker and setting up the required topics.

  • Anaconda Python: Ensure you have Anaconda Python installed.

  • Snowflake Account Privileges: The Snowflake account must have the following:

    • A User ID configured with an RSA key.

    • Admin access to create and set up the necessary database, schema, and tables.

The Python code will operate in a local environment using Conda. It will establish a connection to Kafka, which is running inside a Docker container, and then write data to Snowflake.

Prerequisites for Snowpipe Streaming Setup

  • Confluent Kafka Configuration: Docker must be installed locally to configure Confluent Kafka. Refer to the “Set up Streaming Data” section for instructions on installing Confluent Kafka in Docker and setting up the required topics.

  • Anaconda Python: Ensure you have Anaconda Python installed.

  • Snowflake Account Privileges: The Snowflake account must have the following:

    • A User ID configured with an RSA key.

    • Admin access to create and set up the necessary database, schema, and tables.

The Python code will operate in a local environment using Conda. It will establish a connection to Kafka, which is running inside a Docker container, and then write data to Snowflake.

Set up Streaming Data

To set up a simple Kafka environment using Docker, follow the instructions in this blog: Schema Detection and Evolution in Snowflake for Streaming data. Complete Step 1 and Step 2 as outlined. Use the Confluent datagen connector as your source to continuously generate data with the following configuration.

				
					{
  "name": "DatagenConnectorConnector_user_data",
  "config": {
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schemas.enable": "false",
    "name": "DatagenConnectorConnector_user_data",
    "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.log.enable": "true",
    "kafka.topic": "user",
    "max.interval": "500",
    "schema.string": "{\"type\": \"record\",\"name\": \"userInfo\",\"namespace\": \"my.example\",\"fields\": [{\"name\": \"username\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"age\",\"type\": \"int\",\"default\": -1},{\"name\": \"phone\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"housenum\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"street\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"city\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"state_province\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"country\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"zip\",\"type\": \"string\",\"default\": \"NONE\"}]}",
    "schema.keyfield": "username"
  }
}

				
			

Set up Snowflake

In Snowflake, we need a table to load data. Although the datagen schema contains multiple columns, we will create a table with only 4 columns to demonstrate how to selectively load and transform data during streaming.

				
					-- Table storing raw user-related events ingested from Kafka
CREATE OR REPLACE TABLE user_data
(
 topic string COMMENT 'Name of the Kafka topic from which the event was consumed',
 offset string COMMENT 'Kafka offset value used for ordering and replay tracking',
 ingestion_timestamp string COMMENT 'Timestamp when the data was ingested into the system',
 data variant COMMENT 'Full raw JSON payload as received from Kafka',
 user_name string COMMENT 'User name extracted from the raw JSON data',
 age string COMMENT 'User age extracted from the raw JSON data',
 phone string COMMENT 'User phone number extracted from the raw JSON data',
 city string COMMENT 'User city extracted from the raw JSON data'
);



CREATE OR REPLACE PIPE user_data_pipe
AS
COPY INTO user_data FROM
    (
    SELECT 
    $1:topic,
    $1:offset,
    $1:ingestion_timestamp,
    $1:data,
        $1:data."username"::string,
        $1:data."age"::string,
        $1:data."phone"::string,
        $1:data."city"::string
    FROM 
         TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
    );
				
			

Snowpipe streaming will use this pipe object to ingest data from Kafka into the user_data table.

Set up Snowpipe Streaming SDK

We will use Conda to create a virtual environment. For this demo, you’ll need a virtual environment with the snowpipe-streaming library installed.

				
					conda create --name snowpark-streaming python=3.11
conda activate snowpark-streaming
pip install snowpipe-streaming
				
			

Snowpipe Streaming SDK - Python

Before diving into the code for Snowpipe streaming, let’s review the key functions used:

  • streamingIngestClient: Initializes a streaming ingestion client object. This method requires the following parameters:

    • client_name – A unique identifier to track the client

    • db_name

    • schema_name

    • pipe_name

    • profile_json containing Snowflake authentication details

  • open_channel: Opens a channel on the client instance.

  • append_row: Appends rows to the open channel.

  • get_latest_committed_offset_token: Retrieves the latest committed offset token. Use this to monitor metadata and ensure ingestion proceeds without issues.

  • close: Closes both the channel and the client.

Below is a sample code snippet demonstrating how to ingest data from Kafka into Snowflake tables.

profile.json - configuration
				
					{
    "user": "SNOWFLAKE_USER",
    "account": "SNOWFLAKE_ACCOUNT",
    "authorization_type": "jwt",
    "url": "https://SNOWFLAKE_ACCOUNT.snowflakecomputing.com:443",
    "private_key": "<private key>",
    "role" : "SNOWFLAKE_ROLE" 
}

				
			
kafka_snowpipe_simple.py - Python Code
				
					#!/usr/bin/env python3
"""
Simple Kafka to Snowpipe Streaming - Single File Version

Streamlined version combining Kafka consumer and Snowflake ingestion.
Update configuration and run!

Usage:
    python3 kafka_snowpipe_simple.py
"""

from confluent_kafka import Consumer, KafkaError
import json
from datetime import datetime
import uuid
import os
import time

os.environ["SS_LOG_LEVEL"] = "warn"
from snowflake.ingest.streaming import StreamingIngestClient

# ============================================================================
# CONFIGURATION - UPDATE THESE
# ============================================================================

# Kafka Configuration
KAFKA_BROKER = 'localhost:9092'
KAFKA_TOPIC = 'user'
KAFKA_GROUP = 'snowpipe-consumer'

# Snowflake Configuration
SNOWFLAKE_DB = 'SANDBOX'
SNOWFLAKE_SCHEMA = 'DGANIGER'
SNOWFLAKE_TABLE = 'user_data'
SNOWFLAKE_PIPE = 'user_data_pipe'
PROFILE_JSON = 'profile.json'

# Processing Configuration
BATCH_SIZE = 50  # Number of messages per batch
IDLE_TIMEOUT_SECONDS = 5  # Exit if no messages for this many seconds

# ============================================================================
# GLOBAL STATE
# ============================================================================

class State:
    """Holds global state for processing."""
    def __init__(self):
        self.snowflake_client = None
        self.snowflake_channel = None
        self.kafka_consumer = None
        self.batch = []
        self.total_processed = 0
        self.last_message_time = None

state = State()

# ============================================================================
# SNOWFLAKE SETUP
# ============================================================================

def setup_snowflake():
    """Initialize Snowflake Streaming client and channel."""
    print("Connecting to Snowflake...")
    
    # Create client
    state.snowflake_client = StreamingIngestClient(
        client_name=f"KAFKA_CLIENT_{uuid.uuid4()}",
        db_name=SNOWFLAKE_DB,
        schema_name=SNOWFLAKE_SCHEMA,
        pipe_name=SNOWFLAKE_PIPE,
        profile_json=PROFILE_JSON
    )
    
    # Open channel
    state.snowflake_channel, status = state.snowflake_client.open_channel(
        f"KAFKA_CHANNEL_{uuid.uuid4()}"
    )
    
    print(f"  Snowflake connected: {state.snowflake_channel.channel_name}")
    print(f"   Database: {SNOWFLAKE_DB}")
    print(f"   Schema: {SNOWFLAKE_SCHEMA}")
    print(f"   Table: {SNOWFLAKE_TABLE}\n")

# ============================================================================
# KAFKA SETUP
# ============================================================================

def setup_kafka():
    """Initialize Kafka consumer."""
    print(" Connecting to Kafka...")
    
    config = {
        'bootstrap.servers': KAFKA_BROKER,
        'group.id': KAFKA_GROUP,
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,
        'session.timeout.ms': 6000,
    }
    
    state.kafka_consumer = Consumer(config)
    state.kafka_consumer.subscribe([KAFKA_TOPIC])
    
    print(f"  Kafka connected: {KAFKA_BROKER}")
    print(f"   Topic: {KAFKA_TOPIC}\n")

# ============================================================================
# MESSAGE PROCESSING
# ============================================================================

def transform_message(msg):
    """
    Transform Kafka message to Snowflake row.
    
    Creates a JSON with structure:
    {
        "topic": "topic_name",
        "offset": 12345,
        "ingestion_timestamp": "2025-10-21T14:30:22.123456",
        "data": { ... original kafka message ... }
    }
    """
    try:
        # Parse JSON from Kafka
        kafka_data = json.loads(msg.value().decode('utf-8'))
    except json.JSONDecodeError:
        # Handle non-JSON messages - wrap as string
        kafka_data = msg.value().decode('utf-8', errors='ignore')
    
    # Create row with the requested structure
    # Best Practice - Include metadata fields like topic, offset and ingestion timestamp to track
    row = {
        'topic': msg.topic(),
        'offset': msg.offset(),
        'ingestion_timestamp': datetime.now().isoformat(),
        'data': kafka_data
    }
    
    return row

def process_message(msg):
    """Process each Kafka message."""
    try:
        # Update last message time
        state.last_message_time = time.time()
        
        # Transform message
        row = transform_message(msg)
        
        # Add to batch
        state.batch.append(row)
        state.total_processed += 1
        
        print(f" Message {state.total_processed} buffered (batch: {len(state.batch)}/{BATCH_SIZE})")
        
        # Flush when batch is full
        if len(state.batch) >= BATCH_SIZE:
            flush_to_snowflake()
            
    except Exception as e:
        print(f" Error processing message: {e}")

def check_idle_timeout():
    """Check if idle timeout has been reached."""
    if state.last_message_time is None:
        return False
    
    idle_time = time.time() - state.last_message_time
    if idle_time >= IDLE_TIMEOUT_SECONDS:
        print(f"\n  No messages received for {idle_time:.1f} seconds")
        print(f"   Idle timeout ({IDLE_TIMEOUT_SECONDS}s) reached - initiating graceful shutdown...")
        return True
    return False

def flush_to_snowflake():
    """Flush batch to Snowflake."""
    if not state.batch:
        return
        
    print(f"\n Flushing {len(state.batch)} messages to Snowflake...")
    
    try:
        # Append rows to Snowflake channel
        for i, row in enumerate(state.batch):
            # Use topic and offset to create unique row ID
            row_id = f"{row.get('topic', 'unknown')}_{row.get('offset', i)}"
            state.snowflake_channel.append_row(row, row_id)
        
        print(f" Batch ingested! Total processed: {state.total_processed}\n")
        
        # Clear batch
        state.batch = []
        
    except Exception as e:
        print(f" Error flushing to Snowflake: {e}")
        raise

# ============================================================================
# CLEANUP
# Flush any remaining messages and close kafka connection 
# ============================================================================

def cleanup():
    """Cleanup resources."""
    print("\n Cleaning up...")
    
    # Flush remaining messages
    if state.batch:
        print(f"   Flushing remaining {len(state.batch)} messages...")
        flush_to_snowflake()
    
    # Close Kafka consumer
    if state.kafka_consumer:
        state.kafka_consumer.close()
        print("   Kafka consumer closed")
    
    # Close Snowflake resources
    if state.snowflake_channel:
        state.snowflake_channel.close()
        print("Snowflake channel closed")
    
    if state.snowflake_client:
        state.snowflake_client.close()
        print("Snowflake client closed")
    
    print(f" Final Stats:")
    print(f"Total messages processed: {state.total_processed}")
    print(f"Messages successfully ingested: {state.total_processed - len(state.batch)}")

# ============================================================================
# MAIN
# ============================================================================

def main():
    """Main function."""
    print("\n" + "="*60)
    print("Kafka → Snowpipe Streaming - Simple Version")
    print("="*60 + "\n")
    
    try:
        # Setup Snowflake and Kafka
        setup_snowflake()
        setup_kafka()
        
        print("="*60)
        print(f" Batch Size: {BATCH_SIZE} messages")
        print(f"  Idle Timeout: {IDLE_TIMEOUT_SECONDS} seconds")
        print(" Press Ctrl+C to stop")
        print("="*60 + "\n")
        
        # Initialize last message time
        state.last_message_time = time.time()
        
        # Start consuming
        print(" Listening for messages...\n")
        
        while True:
            # Poll for a single message with 1 second timeout
            msg = state.kafka_consumer.poll(timeout=1.0)
            
            if msg is None:
                # No message received, check for idle timeout
                if check_idle_timeout():
                    break
                continue
            
            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    continue
                else:
                    print(f" Kafka error: {msg.error()}")
                    continue
            
            # Process the message
            process_message(msg)
        
        print(" Graceful shutdown initiated")
        
    except KeyboardInterrupt:
        print("  Stopped by user")
        
    except Exception as e:
        print(f"\n Error: {e}")
        import traceback
        traceback.print_exc()
        
    finally:
        cleanup()

if __name__ == '__main__':
    main()


				
			

This sample demonstrated how to use the Python SDK to ingest data from a streaming source into a Snowflake table. Snowflake also supports data ingestion through the Java SDK and REST API. In this example, a pipe object was used to selectively choose columns and ingest data into tables. Two alternative approaches can be used.

  • Ingest incoming data as-is into a VARIANT column, and extract parsed fields with additional metadata such as client name, channel name, and offset.

  • Use MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE in the pipe command to automatically parse JSON fields and load them into Snowflake tables. Note that Snowflake does not support automatic schema evolution—if fields are missing from the table, Snowflake will not add them. In this scenario, the pipe command will be as follows.

				
					CREATE OR REPLACE PIPE USER_DATA_PIPE
AS
   COPY INTO USER_DATA TABLE(DATA_SOURCE(TYPE => 'STREAMING')) MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE;
				
			

Best Practices

The following are some best practices to adopt when utilizing high-performance architecture.

Closing

This blog explores how to leverage the new Snowpipe streaming feature for continuous data ingestion using Python, highlighting the differences between Classic and High-Performance architectures. It is essential for organizations to carefully assess their specific use cases to determine which architecture best suits their needs.

phData Blue Shield

Which Snowpipe Streaming approach fits you best?

At phData, our experts are ready to evaluate your design and architecture and help develop the optimal solution. Let’s explore your Snowpipe Streaming use case together.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit