According to the latest estimates, 402.74 million terabytes of data are generated around the globe every day. This includes data that is newly created, captured, copied, consumed, and even purged. Naturally, this leads to the question: why is there so much data, and how do we manage it all?
While the answer is complex, a simpler and equally important question is: Can we process and extract insights from this vast ocean of data in real-time?
The answer to this is a resounding yes.
Thanks to advancements in cloud technology and significant improvements in computational power, we now have the ability to visualize and analyze data as it is generated, with minimal lag. This incredible capability is known as Real-Time Data Analytics, and it is revolutionizing the way we understand and utilize information.
In this blog, we will uncover:
What is real-time analytics, and why is it so important?
How to implement a real-time analytics use case using AWS and Snowflake.
A demo of the implementation.
What is Real-Time Data Analytics?
Real-time data analytics refers to the continuous processing and analysis of data the moment it is generated, providing instant insights and facilitating immediate actions. This method enables organizations to swiftly derive valuable information from quickly changing datasets, ultimately improving decision-making and allowing agile responses to product behaviors, evolving market trends, customer behaviors, operational challenges, etc.
Key Attributes:
Minimal Latency: Data is processed swiftly, ranging from milliseconds to seconds.
Ongoing Data Streams: Leverages streaming data from sources such as IoT devices, logs, transactions, social media, and more.
Scalability: Capable of dynamically managing large data volumes.
Technologies Involved:
Streaming Tools and Platforms: Snowpipe, AWS Kinesis, Apache Kafka, Apache Flink, Google Pub/Sub, etc.
Databases & Data Stores: Snowflake, AWS Redshift, Apache Druid, ClickHouse, etc.
Processing Engines: AWS Lambda, Apache Spark Streaming, Google Dataflow, etc.
Dashboards & Visualization: Tableau, Power BI, Qlik, etc.
The Purpose and Applications of Real-Time Analytics
Below are a few of the most common (and important) applications where real-time analytics can be leveraged:
Fraud Detection: Financial institutions use real-time analytics to identify fraudulent transactions.
IoT Monitoring: Smart devices and sensors like health monitors stream data for health monitoring and predictive maintenance.
Stock Market Analysis: Traders assess price movements and execute trades.
Customer Engagement: Companies tailor offers based on live user interactions.
Building a Real-Time Data Analytics Solution using Snowflake and AWS
Use Case
In this blog, we will explore a practical application of IoT Smart Device Monitoring. Consider a Smart Health Monitor that, when attached to a patient’s arm, continuously records vital signs such as pulse rate, blood pressure, heart rate, SpO2 levels, and respiratory rate.
This data is being fed to an AWS Kinesis Stream continuously (we will limit it to every 15-30 seconds for this demo) from the IoT device, which in turn is being fed into Snowflake in real-time, facilitating the analysis and monitoring through a dashboard in Tableau.
It is important to note that this solution is designed to complement traditional medical monitoring and diagnostic tools, not replace them. It serves as an additional resource for healthcare providers to monitor their patients at their convenience for research, diagnosis, and other medical purposes.
Architecture
Below is what the high-level data flow looks like for this use case:
Snowflake Setup
First things first, we’ll need to set up Snowflake.
Create Database –
create database HEALTH_DATA
;
Create Schema –
create schema HEALTH_DATA.SENSORS
;Create Tables – For this use case, we will create a table called
remote_health_sensors
, which will store data from the IoT device. However, we will also have two dimension tables populated with synthetic data for reporting purposes. Below are the DDLs of the tables:
/* Create dimension table for doctor information */
/* Stores detailed information about medical practitioners */
CREATE OR REPLACE TABLE dim_doctor_info (
doctor_id DECIMAL(38, 0),/* Unique identifier for each doctor */
first_name VARCHAR(50),/* Doctor's first name */
last_name VARCHAR(50),/* Doctor's last name */
specialty VARCHAR(50),/* Medical specialty/expertise */
license_number VARCHAR(20),/* Medical license identification */
office_phone VARCHAR(12),/* Contact number for doctor's office */
email VARCHAR(100),/* Professional email address */
department VARCHAR(50),/* Department association */
hospital_affiliation VARCHAR(100),/* Affiliated hospital/medical center */
);
/* Create dimension table for patient information */
/* Stores demographic and contact information for patients */
CREATE OR REPLACE TABLE dim_patient_info (
patient_id DECIMAL(38, 0),/* Unique identifier for each patient */
sensor_id DECIMAL(38, 0),/* Associated sensor ID */
doctor_id DECIMAL(38, 0),/* Associated doctor ID */
first_name VARCHAR(50),/* Patient's first name */
last_name VARCHAR(50),/* Patient's last name */
date_of_birth DATE,/* Patient's birth date */
gender VARCHAR(1),/* Patient's gender */
blood_type VARCHAR(3),/* Patient's blood type */
emergency_contact VARCHAR(100),/* Emergency contact information */
phone_number VARCHAR(12)/* Patient's contact number */
);
,/* Create main table for health sensor data */
,/* Stores real-time health metrics from remote monitoring devices */
CREATE OR REPLACE TABLE remote_health_sensors (
sensorid DECIMAL(38, 0),/* Unique identifier for each sensor */
sensortype VARCHAR(50),/* Type of health monitoring sensor */
internetip VARCHAR(15),/* IP address of the sensor */
connectiontime TIMESTAMPNTZ,/* Time of sensor data transmission */
currenttemperature DECIMAL(38, 0),/* Patient's current temperature reading */
currenttemperatureunit VARCHAR(10),/* Temperature measurement unit */
currentsystolicbloodpressure DECIMAL(38, 0),/* Systolic blood pressure reading */
currentsystolicbloodpressureunit VARCHAR(10),/* BP measurement unit */
currentdiastolicbloodpressure DECIMAL(38, 0),/* Diastolic blood pressure reading */
currentdiastolicbloodpressureunit VARCHAR(10),/* BP measurement unit */
currentpulserate DECIMAL(38, 0),/* Current pulse rate reading */
currentpulserateunit VARCHAR(10),/* Pulse rate measurement unit */
currentrespiratoryrate DECIMAL(38, 0),/* Respiratory rate reading */
currentrespiratoryrateunit VARCHAR(10),/* Respiratory rate unit */
currentspo2level DECIMAL(38, 0),/* Blood oxygen saturation level */
currentspo2levelunit VARCHAR(10)/* SPO2 measurement unit */
);
AWS Setup
Next up, we’ll follow these steps to set up the AWS services and connect them to the Snowflake table for storing the data in real-time:
Set up Amazon Kinesis
Search for “Amazon Kinesis” in the left pane of the AWS Management Console.
Select the Create Data Stream option in the Kinesis Window.
Give your stream a proper Name and select On-Demand or Provisioned Mode based on the criticality and complexity of the use case. Next, select On-Demand since this is a demo only. Add tags if you need them, and then click on Create Data Stream.
We have now successfully created a Kinesis Data Stream which will receive real-time data from the IoT device.
Set up Amazon Data Firehose
Once the Kinesis Stream is created, go back to the Kinesis console and click on Amazon Data Firehose. Once inside the Amazon Data Firehose Console, click on Create Firehose stream. We will now start creating a Firehose stream that will take the data from Kinesis and put it into Snowflake.
Select the source as Amazon Kinesis Data Streams and the destination as Snowflake. We don’t need to change the Firehose Stream Name but if we do it needs to be unique across AWS. In the Source settings, browse and select the Kinesis Data Stream we created in the previous step.
In the Destination Settings, add your Snowflake Account URL ( if you are using a PrivateLink between AWS and Snowflake, add the
privatelink-account-url
from theselect SYSTEM$GET_PRIVATELINK_CONFIG()
statement instead of the normal Account URL ) as shown below. Enter your Snowflake Username and the Private Key associated with it.
We can now add the Snowflake Role we want to use, and if we are using a PrivateLink between AWS and Snowflake, we will add the
VPCE ID
(theprivatelink-vpce-id
from theselect SYSTEM$GET_PRIVATELINK_CONFIG()
statement). Finally, we will specify the Snowflake database, schema, and table name, which were created in the Snowflake Setup step, as shown below. We can keep the rest of the options as default.
Additionally, we will add an S3 bucket, which will store any source records that are rejected while loading into Snowflake from the Firehose stream.
Once all of this is done, we will click on Create Firehose stream, which will eventually create the Firehose stream. This stream will consume data from Kinesis and load it into Snowflake in real-time.
We have successfully created a real-time data integration pipeline to load data from an IoT device into Snowflake!
Simulate IoT Device Using Python
Since we don’t have the privilege of having a live demo with a Smart Health Monitoring device, we have built a basic simulator in Python. This will generate health data for 10 dummy patients every 15-30 seconds and feed it into the Amazon Kinesis Data Stream.
Here’s the code in Python to do this:
import json
import random
import time
import uuid
import boto3
import os
from datetime import datetime
from dotenv import load_dotenv
import sys
# Load environment variables from .env file
load_dotenv()
# Update the client creation to specify region
def send_to_kinesis(data, stream_name):
# Pretty print the data being sent for debugging/monitoring purposes
#print(json.dumps(data, indent=4))
# Initialize Kinesis client with AWS credentials and region from environment variables
# This client will handle all interactions with AWS Kinesis service
kinesis_client = boto3.client(
'kinesis',
region_name=os.getenv('AWS_REGION')
)
try:
# Attempt to send the record to Kinesis stream
# Parameters:
# - StreamName: The name of your Kinesis stream
# - Data: The actual data to send (must be converted to JSON string)
# - PartitionKey: Determines which shard the data goes to (random number used here)
response = kinesis_client.put_record(
StreamName=stream_name,
Data=json.dumps(data),
PartitionKey=str(random.randint(1, 100)) # Random partition key between 1-100
)
# Return the response from AWS which includes:
# - SequenceNumber: Unique identifier for the record
# - ShardId: The shard where the data was written
return response
except Exception as e:
# If any error occurs during the sending process:
# 1. Print the error message
# 2. Re-raise the exception to be handled by the calling function
print(f"Error sending record to Kinesis: {str(e)}")
raise
def main():
#Simulate data for 10 different devices
devices=[
{
"ID": 28765192,
"IP": "149.218.116.155",
"TYPE": "SmartHeatlhMonitor"
}
#add random data for 9 other different devices
]
# Iterate through each device in our devices dictionary/list
for i in devices:
# Create a data payload for each sensor/device
# This represents one reading from a smart health monitoring device
sample_data = {
# Basic device identification
"sensorId": i["ID"], # Unique identifier for each sensor
"sensorType": i["TYPE"], # Model/type of the health monitor
"internetIP": i["IP"], # IP address of the device
# Timestamp of the reading in ISO format (YYYY-MM-DDTHH:MM:SS)
"connectionTime": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
# Vital signs measurements with their respective units
# Body temperature (range for simulator: 95-100°F)
"currentTemperature": random.randint(95, 100),
"currentTemperatureUnit": 'F',
# Heart rate (range for simulator: 50-100 bpm)
"currentPulseRate": random.randint(50, 100),
"currentPulseRateUnit": 'Beats/Minute',
# Blood oxygen level (range for simulator: 95-100%)
"currentSpO2Level": random.randint(95, 100),
"currentSpO2LevelUnit": '%',
# Breathing rate (range for simulator: 10-15 breaths per minute)
"currentRespiratoryRate": random.randint(10, 15),
"currentRespiratoryRateUnit": 'Breaths/Minute',
# Blood pressure readings
# Systolic (range for simulator: 100-150 mmHg)
"currentSystolicBloodPressure": random.randint(100, 150),
"currentSystolicBloodPressureUnit": 'mmHg',
# Diastolic (range for simulator: 60-90 mmHg)
"currentDiastolicBloodPressure": random.randint(60, 90),
"currentDiastolicBloodPressureUnit": 'mmHg'
}
try:
# Attempt to send the generated data to AWS Kinesis
# Stream name is retrieved from environment variables
response = send_to_kinesis(sample_data, os.getenv('KINESIS_STREAM_NAME'))
print("Received Data for Sensor ID: ", i["ID"])
# Response contains sequence number and shard ID (commented out to reduce console clutter)
#print(response)
except Exception as e:
# If any error occurs during sending, print the error message
print(f"Error sending record to Kinesis: {e}")
if __name__ == "__main__":
# Display startup message to indicate the simulator is beginning
print("Starting Smart Health Monitor Device Signal Simulator...")
# Inform user how to gracefully terminate the program
print("Press Ctrl+C to stop the program")
try:
# Infinite loop to continuously simulate device signals
while True:
# Execute the main function which generates and sends data for all devices
main()
# Pause execution for 30 seconds
# This simulates real-world device behavior where vital signs
# are measured and transmitted at regular intervals
# Note: time.sleep() can be interrupted by Ctrl+C
time.sleep(10)
except KeyboardInterrupt:
# Handle the Ctrl+C keyboard interrupt
# This provides a graceful shutdown when user wants to stop the simulator
print("\nProgram stopped by user")
# Exit with status code 0 indicating successful/normal termination
sys.exit(0)
except Exception as e:
# Catch any other unexpected errors that weren't specifically handled
# This is the last line of defense for error handling
print(f"Unexpected error: {str(e)}")
# Exit with status code 1 indicating abnormal termination
sys.exit(1)
Demo Video
Once all integrations are complete, we will conduct a demo using 10 dummy IoT devices. To illustrate the impact of real-time data generation, loading, and analytics, we’ve created a short video demonstrating the entire process from start to finish. Check out the video below to see how it all works.
Closing
Building a real-time data analytics platform with Snowflake and AWS empowers companies to utilize fast, scalable, and cost-effective data processing capabilities. By integrating AWS services such as Kinesis and Firehose with Snowflake’s advanced analytics engine, businesses can effortlessly ingest, process, and visualize streaming data nearly instantaneously.
As data-driven decision-making becomes increasingly vital for staying competitive, adopting a strong cloud-based architecture ensures that businesses can quickly respond to insights, optimize their operations, and improve customer experiences. Whether you are monitoring IoT devices, detecting fraud, or personalizing user interactions, a well-designed real-time analytics platform forms the foundation for innovation and efficiency.
Are you prepared to elevate your data strategy?
Begin experimenting with Snowflake and AWS today to fully harness the power of real-time analytics.
FAQs
What is Snowpipe?
Snowpipe is a cloud-native, serverless data ingestion service from Snowflake that seamlessly loads data from files as soon as they appear in a stage. This enables near real-time data loading into Snowflake tables. As a fully managed service, Snowpipe automates the ingestion process from multiple cloud storage platforms, including AWS S3, Azure Blob Storage, and Google Cloud Storage.
Is Snowflake an OLAP or OLTP?
Snowflake is a cloud-based data platform designed for efficient data storage, processing, and analytics. It is primarily used for Online Analytical Processing (OLAP), a system that enables businesses to analyze large and complex datasets to derive valuable insights. In addition to OLAP, Snowflake supports Hybrid Transactional and Analytical Processing (HTAP) through its Hybrid Tables, allowing seamless integration of both OLAP and Online Transaction Processing (OLTP) workloads. This capability enables businesses to run real-time analytics on transactional data without requiring complex data movement or transformations, enhancing performance and decision-making efficiency.