April 18, 2024

Schema Detection and Evolution in Snowflake for Streaming Data

By Deepa Ganiger

In our last blog, we looked at how we can use Snowflake Data Cloud’s new schema detection and evolution feature while loading structured or semi-structured data. This is very helpful during batch loading using the COPY command or while using Snowpipe.

In this blog, we will see how we can implement schema change detection and evolution in a streaming scenario. We will ingest data using Confluent Kafka and enable Schema detection in the Kafka connector.

What is Schema Detection And Evolution?

Schema detection and Evolution is an automated process to detect changes in incoming data source schema and update the structure of the table in the target system (in this case, Snowflake). 

Usually, these are handled with custom processes to compare and alter the tables as needed. But Snowflake is continuously introducing newer features to implement these natively rather than implementing a custom process. In the current process, Snowflake supports detection and evolution for CSV, Parquet, and JSON in files and JSON within streaming data from Kafka.

How are Schema Detection and Evolution in Streaming Different From the Batch Data?

Schema evolution is not frequent in the batch process unless the data is trickling in from IOT devices in flat files. Meanwhile, in the streaming process, especially when data comes in the form of JSON or AVRO, there will always be cases where the source system may be adding new fields. 

The process of setting up schema evolution in the Snowflake Kafka connector is simpler, as we will see below. However, there are still limitations based on the complexity of the data.

Prerequisites

In this blog, we will require the following to be set up. Confluent offers a cloud version of Kafka, but for this demo, we will use the local version using a docker setup.

  • Docker setup

  • Kafka setup

  • Snowflake account

  • Snowflake Connector for Kafka

Step 1: Confluent Docker Setup

Docker is a platform designed to enable organizations to develop and build applications in containers. This will allow organizations to build platform-agnostic applications and deploy them to various services like EKS, AKS, etc. Docker can be downloaded and installed directly from the Docker website.  Once docker is installed, let’s start setting up the container.

Download the docker-compose.yaml file from the docker website.

				
					curl --silent --output docker-compose.yml https://raw.githubusercontent.com/confluentinc/cp-all-in-one/7.6.0-post/cp-all-in-one-kraft/docker-compose.yml
				
			

Once docker is installed, bring up the container by running the following command. Ensure that the docker is already up and running.

				
					docker-compose up -d --build
				
			

The build process will bring up containers that are required for Kafka, such as a zookeeper, schema registry, and even datagen (to generate some dummy data). However, to ingest data into Snowflake, we will also need a Snowflake Kafka connector installed inside the container.

Before installing the Snowflake Kafka connector, verify if the containers are up and running by running the following command.

				
					docker ps
				
			

This will display the list of containers that are running, as shown below.

Once this is confirmed, run the following command to install the Kafka connector inside the container and then restart the connected cluster.

				
					docker-compose exec connect bash -c 'confluent-hub install --no-prompt --verbose snowflakeinc/snowflake-kafka-connector:2.0.1'

docker-compose restart connect

				
			

Now that the Kafka setup is done let’s set up the Kafka topics and ingest data into Snowflake. 

Once the cluster is up and running, Confluent UI can be accessed using this http://localhost:9021/clusters from your local machine.

Going forward, we will use the UI to perform configurations like setting up topics and connecting configurations.

Step 2: Kafka Topic Setup

In the confluent UI, go to Topics within the cluster and create a topic named user. We will initially ingest user data in a specific schema and then change the layout to see how the connector behaves in a schema change/evolution scenario. Create the topic with all default settings and a single partition.

Now that the topic is created let’s generate some data. Instead of manually generating data or using programming languages like Python, we will use a datagen connector already installed as part of the initial configuration. Head over to the Connect tab on the left, click connect-default, and then click the Add Connector button. You should see the DataGen connector on the screen below.

Instead of manually configuring the datagen connector, use the configuration below to configure the data to be generated. Copy the contents below in the text file and save them as datagen-configuration.json. 

Then click on Upload connector config file in the UI and upload the datagen-configuration.json file. Ensure that the topic name in the configuration file matches the topic you created earlier.

				
					{
   "name": "DatagenConnectorConnector_user_data",
   "config": {
     "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "io.confluent.connect.avro.AvroConverter",
     "errors.log.enable": "true",
     "kafka.topic": "user",
     "max.interval": "10000",
     "schema.string": "{\"type\": \"record\",\"name\": \"userInfo\",\"namespace\": \"my.example\",\"fields\": [{\"name\": \"username\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"age\",\"type\": \"int\",\"default\": -1},{\"name\": \"phone\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"housenum\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"street\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"city\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"state_province\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"country\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"zip\",\"type\": \"string\",\"default\": \"NONE\"}]}",
     "schema.keyfield": "username",
     "value.converter.schema.registry.url": "http://schema-registry:8081",
     "value.converter.schemas.enable": "false"
   }
  }

				
			

This configuration file is set up to push data into the user topic every 10 seconds. Once the datagen connector is configured and started, head over to the topic screen to verify if messages are being pushed.

Step 3: Snowflake Kafka Connect Configuration

The next step is to set up the connect configuration to stream the data into Snowflake. Ensure that you have a Snowflake account with DB and Schema to load data. The role should have the ability to create a stage, pipe, and table in the given schema. 

Kafka Connect uses key-pair authentication instead of a regular user ID password. Ensure that the user ID used already has key-pair authentication configured. 

Like the Datagen connector,  use the Connect cluster screen and upload the configuration file below. Before uploading, ensure to update the following with your account details.

  • snowflake.url.name

  • snowflake.user.name

  • snowflake.private.key

  • snowflake.private.key.passphrase

  • snowflake.database.name

  • snowflake.schema.name

  • snowflake.role.name

  • snowflake.topic2table.map

				
					{
 "name": "snowflake_kafka_avro_connector",
 "config": {
   "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
   "tasks.max": "1",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "topics": "user_data_avro",
   "snowflake.url.name": "https://phdata.snowflakecomputing.com:443",
   "snowflake.user.name": "**********",
   "snowflake.private.key": "**********",
   "snowflake.private.key.passphrase": "**********",
   "snowflake.database.name": "USER_DGANIGER",
   "snowflake.schema.name": "EXERCISE",
   "snowflake.role.name": "<snowflake role>",
   "snowflake.topic2table.map": "user:kafka_user_data",
   "buffer.count.records": "10000",
   "buffer.size.bytes": "5000000",
   "buffer.flush.time": "60",
   "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
   "snowflake.enable.schematization": "TRUE",
   "value.converter.basic.auth.credentials.source": "USER_INFO",
   "value.converter.schema.registry.url": "http://schema-registry:8081",
   "value.converter.basic.auth.user.info": "jane.smith:MyStrongPassword"
 }
}

				
			

Uploading this configuration file in the Kafka connect cluster will set up the Snowflake connection and start creating the table with the name specified in the topic2tablemap configuration.

Step 3: Validate the Data in Snowflake

Login to Snowflake and check the table in the schema specified in the config file. When you initially query the table, it will only have columns(RECORD_METADATA) instead of a regular Kafka setup(RECORD_METADATA and RECORD_CONTENT). This is because we have set up the following configurations in the connector config.

				
					snowflake.enable.schematization = True
snowflake.ingestion.method=snowpipe streaming
				
			

These both configurations should be set up together to ensure that the Kafka connect can connect to Snowflake and shred JSON into individual columns. Within a few seconds, we can see that the columns in the JSON will be loaded into the table as individual columns instead of single variant columns.

Step 4: Schema Detection and Evolution

To check how check detection and evolution work, let’s slightly change the connector config and reload it into Confluent Kafka. 

Here is the updated datagen configuration with 2 added fields – email and notes. Since we have already configured the Snowflake Kafka connect configuration to load into Snowflake, no change is required in the Snowflake connect configuration.

				
					{
   "name": "DatagenConnectorConnector_user_data",
   "config": {
     "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
     "tasks.max": "1",
     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter": "io.confluent.connect.avro.AvroConverter",
     "errors.log.enable": "true",
     "kafka.topic": "user",
     "max.interval": "10000",
     "schema.string": "{\"type\": \"record\",\"name\": \"userInfo\",\"namespace\": \"my.example\",\"fields\": [{\"name\": \"username\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"age\",\"type\": \"int\",\"default\": -1},{\"name\": \"phone\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"housenum\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"street\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"city\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"state_province\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"country\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"zip\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"email\",\"type\": \"string\",\"default\": \"NONE\"},{\"name\": \"notes\",\"type\": \"string\",\"default\": \"NONE\"}]}",
     "schema.keyfield": "username",
     "value.converter.schema.registry.url": "http://schema-registry:8081",
     "value.converter.schemas.enable": "false"
   }
  }

				
			

Step 5: Validate the Schema Changes

In a few seconds, we can see that Kafka Connect will start streaming the changes into the table by adding 2 new fields, as shown below.

Currently, these features only work for simple JSON scenarios. This means only the first layer of JSON is flattened into individual columns. The child fields are only loaded into Snowflake as a variant for deeply nested JSON. Additional processing may be required on the Snowflake side to transform further.

As we saw here, it is incredibly simple and mostly automated to set up schema detection and evolution in a Kafka connect configuration. In most simple setups, it is better to set it up this way to reduce further transformations within Snowflake.

Closing

This overview provides a high-level understanding of the Schema Detection and Evolution process in a streaming scenario. However, the JSON may be complex and nested in the real world, which will require additional transformation after ingesting Snowflake. This will require setting up pipelines to convert the semi-structured to relational format.

At phData, our team of highly skilled data engineers specializes in ETL/ELT processes across various cloud environments. If you require assistance or expertise in data engineering, don’t hesitate to connect with our knowledgeable professionals at phData. 

We’re here to empower your data solutions and ensure their seamless evolution in dynamic production environments.

Data Coach is our premium analytics training program with one-on-one coaching from renowned experts.

Accelerate and automate your data projects with the phData Toolkit