How Do I Use StreamSets Test Framework?

StreamSets Test Framework (STF) is a set of Python tools and libraries that enables developers to write integration tests for StreamSets:

  • Data Collector
  • Control Hub
  • Data Protector
  • Transformer


This unique test framework allows you to script tests for pipeline-level functionality, pipeline upgrades, functionality of individual stages, and much more according to the requirements.

But the best part is that STF is available for free to StreamSets Premium and Enterprise customers! All it takes is a little work up front and you’ll be able to write test cases for any of your pipelines.

This hands-on blog covers the StreamSets Test Framework installation process, how to work within the STF, and how to write tests for pipelines in a registered Data Collector or Transformer within StreamSets Control Hub.

Let’s dive in and get your test framework built!

How to Install StreamSets Test Framework

Getting started with STF installation requires a few steps and prerequisites, which we’ve summed up below.

Installation Requirements

  • Docker 
  • Python 3.6 (StreamSets recommends 3.6 version as some users have reported some errors with Python 3.7)
  • Activation key — to use STF, you will need to request a StreamSets SDK activation key

Installation Steps and Verification

The installation of STF can be done using pip3 on the host machine.
				
					pip3 install streamsets-testframework
				
			
Run the below command to confirm the installation and its version.
				
					stf --version (shows the installed version)
				
			

Docker Images

The STF command launches most of its sub-commands in a Docker container. To build it, run:
				
					stf build extras
				
			

If this throws the error below (the current version at the time of writing [1.1.0] needs a fix) I’ve included a command that will fix it.

Build Error Example

Screenshot of a code snippet if you were to get a build error within StreamSets test framework

Build Error Command Fix

				
					stf build --build-arg DATABRICKS_JDBC_DRIVER_URL=https://databricks.com/wp-content/uploads/drivers-2020/SimbaSparkJDBC42-2.6.17.1021.zip extras --extra-library databricks
				
			

STF Usage

STF is built on top of the StreamSets SDK for Python and uses pytest as its underlying test executor. This includes a collection of client-facing APIs to facilitate interaction with external environments. 

The below command gives more details on usage:

$ stf -h

Be sure to check out the STF Documentation for the latest information and the StreamSets SDK for Python if you’re looking for a basic understanding and for advanced usage information on STF.

What is an STF Shell?

An STF Shell is used to get an interactive shell within the test framework environment and is particularly useful during the test development process, where a user may want to explore the streamsets.testframework package from an interpreter.

Run stf shell and then enter into the python interpreter by running python command.

Connecting to Data Collector and Transformer from ControlHub

The code below works for registered Data Collector and Transformer instances within StreamSets Control Hub.

Here your Control Hub credentials need to be used to instantiate an instance of streamsets.sdk.ControlHub before it’s passed as an argument to streamsets.sdk.DataCollector or streamsets.sdk.Transformer as shown below:

(Note: You’ll need to replace the argument values according to your setup)
				
					from streamsets.sdk import ControlHub
sch = ControlHub('https://cloud.streamsets.com',username='<your_username>',password=<your_password>)
from streamsets.sdk import Transformer
Transformer.VERIFY_SSL_CERTIFICATES = False #(by default set to true, again change it depending on your settings)
st = Transformer(server_url = '<your_url>', control_hub = sch)
from streamsets.sdk import DataCollector
data_collector.VERIFY_SSL_CERTIFICATES = False #(by default set to true, again change it depending on your settings)
data_collector = DataCollector('<your_url>)

				
			

Accessing the Data Collector or Transformer Pipelines and its Stages From Control Hub

There are two ways to access the pipelines. One way is directly from the Control Hub either by using pipeline name or pipeline id.
				
					pipeline1 = sch.pipelines.get(name='<pipeline_name>')
pipeline1.stages
origin = pipeline1.stages[0]
destination = pipeline1.stages[1]

				
			
Another way to access these pipelines is directly through Transformer or Data Collector:
				
					pipeline2 = st.get_pipeline(pipeline_id='<pipeline_id>')
pipeline2.stages
pipeline3 = data_collector.get_pipeline(pipeline_id='<pipeline_id>')
pipeline3.stages
				
			
We can inspect in the Python interpreter using the built-in dir() function or by using Python’s built-in help() function on an instance of the class. Few examples are:
				
					help(Transformer)
dir(Transformer)
help(origin)

				
			
Then with the attribute name in hand, you can read the value of the configuration. In the above example: origin.data_format

Connecting to Test Environments

Now that we have defined how to connect to the Data Collector and how the STF shell functions, we have everything in place to begin executing our tests against various platforms. The following sections provide a hint at the different options available and the remainder of this piece will focus on testing with Databricks.

Further integrations can be found using StreamSets provided API’s that can connect to various sources like HDFS, Azure, AWS, Spark, Kafka, databases etc.

Databricks and Listing Files

Currently, documentation is not publicly available on how to connect to Databricks.

As many clients are using Databricks, here is the information on how to connect from STF and work with it.

				
					import os
os.environ['DATABRICKS_TOKEN'] ='<your_token>'
from streamsets.testframework.environments.databricks import DatabricksDBFS,DatabricksInstance 
db=DatabricksInstance('<databricks_url>',cluster_id='<cluster_id>')
db.fs.list('<location>')

				
			

Cloudera and Listing Files

Similarly, clients that are using Cloudera Spark will have a similar connection criteria, here is the information on how to connect from STF and work with it.
				
					import os
os.environ['CLOUDERA_CONFIGS'] ='<core-site.xml>'
from streamsets.testframework.environments.cloudera import ClouderaSpark2OnYarn,ClouderaHdfs 
cldr=ClouderaHdfs('<cloudera_cluster_namel>','<service_name>')
cldr.fs.list('<location>')
				
			

How to Build Your Own Docker Image to Use With STF Tests

To import extra modules (like pyarrow or Pandas), you can build a Docker image with that module in place. To do that, create an empty folder, set it as your working directory and add a file called “Dockerfile” which looks like this:
				
					FROM streamsets/testframework:latest
RUN pip install pyarrow
RUN pip install pandas
				
			
To build an image from the Dockerfile in the current directory and tag the image, run this command from within the folder:
				
					docker build -t streamsets/testframework:<anyname> .
				
			

To use your image when running STF, add this:

				
					stf --docker-image-dont-pull --docker-image streamsets/testframework:<above_given_name> <rest of your existing command here>
				
			

STF Test

STF tests are executed by cd-ing into a directory containing the test source and then running the STF test command. Any arguments after the test are passed into the Test Framework container as arguments to an invocation of the pytest command.

Listed below are three common examples of the STF Test:

Test Your Metric Count

The first example runs a test case on metric count by creating a job then starting, running, stopping, and deleting it in Control Hub for the assigned pipeline in your Data Collector or Transformer instance.

Prerequisites: Requires name of any working pipeline.

Actual code goes here <anyfilename.py>

				
					import pytest
Import pytest
from streamsets.sdk import ControlHub

def pytest_addoption(parser):
    parser.addoption('--pipeline-name')
 
@pytest.fixture(scope='session')
def sch(sch_session):
    yield sch_session
 
@pytest.fixture(scope='session')
def pipeline(sch_session, request):
    pipeline_name = request.config.getoption('pipeline_name')
    pipeline_ = sch_session.pipelines.get(name=pipeline_name)
    yield pipeline_

def test_run_job(sch,pipeline):
    #Assertion on metric count by creating a job and running it.
    job_builder = sch.get_job_builder()
    job = job_builder.build(job_name='stf_test', pipeline=pipeline)
    job.data_collector_labels=['dev'] #replace with your instance label names to which you want to connect
    try:
        sch.add_job(job)
        sch.start_job(job)
        current_status = sch.get_current_job_status(job).response.json().get('status')
        assert current_status == 'ACTIVE'
        job_metrics = sch.jobs.get(job_name="stf_test")
        output = job_metrics.metrics(metric_type='RECORD_COUNT', include_error_count=True).output_count
        assert output is not None
    finally:
        sch.stop_job(job)
        sch.delete_job(job)

				
			
Remember to replace argument values with your details to run these tests!
				
					stf test <filename.py>
 --sch-server-url 'https://cloud.streamsets.com/'
--sch-username '<user_name>' --sch-password ‘<password>’  
--pipeline-name <your_pipeline_name>
				
			

Test a Pipeline in Transformer or Data Collector

This test previews the pipelines and retrieves the output values from the stage for assertions.

Prerequisites: Requires a pipeline that already exists in your Transformer or Data Collector instances and it has the origin “dev raw data source” and destination “trash”. The “dev raw data source” includes JSON data_format and contains the following data:

				
					[
  {
	"name": "Apple",
	"age": 30,
	"salary": 1000
  },
  {
	"name": "Mac",
	"age": 25,
	"salary": 1500
  }
]

				
			

We can define the conftest.py file fixture functions in the same directory where you save your tests. This makes them accessible across all the test files in that same directory.

For that to work, you’ll need to create a new conftest.py file in the same directory where you have tests and add the following code to it:

				
					import pytest
 
def pytest_addoption(parser):
    parser.addoption('--pipeline-name')
 
@pytest.fixture(scope='session')
def sch(sch_session):
    yield sch_session
 
@pytest.fixture(scope='session')
def pipeline(sch_session, request):
    pipeline_name = request.config.getoption('pipeline_name')
    pipeline_ = sch_session.pipelines.get(name=pipeline_name)
    yield pipeline_

				
			

Place the following code in :

				
					def test_preview_and_validation(sch, pipeline):
    '''Preview of pipeline with events.
    Ensure that there are no validation or other issues.'''
    pipeline.configuration['shouldRetry'] = False
    preview = sch.run_pipeline_preview(pipeline).preview
    assert preview is not None
    assert preview.issues.issues_count is None
    '''Assertion on origin data from preview'''
    origin=pipeline.stages[0]
    data = preview[origin.instance_name].output
    keys = data[1].field.keys()
    assert len(data) == 2
    assert data[0].field['name'] == 'Apple'
				
			

Don’t forget to replace argument values with your details to run these tests!

				
					stf test <filename.py>
 --sch-server-url 'https://cloud.streamsets.com/'
--sch-username '<user_name>' --sch-password ‘<password>’  
--pipeline-name <your_pipeline_name>
				
			

Test your Connection and Compare Schema

In this example, let’s connect to Databricks, access files, and compare schema of existing files in Databricks with pipelines that have the origin stage schema.

Prerequisites: Requires a pipeline that already exists in the Transformer or Data Collector instances that has the origin “dev raw data source” and “Delta lake” as the destination. The “dev raw data source” includes JSON data_format and contains the data below (like in our first example):

				
					[
  {
	"name": "Apple",
	"age": 30,
	"salary": 1000
  },
  {
	"name": "Mac",
	"age": 25,
	"salary": 1500
  }
]
				
			
Place the following code in :
				
					import pytest
import os
import pyarrow as pa
import pyarrow.parquet as pq
from streamsets.sdk import ControlHub
from streamsets.testframework.environments.databricks import DatabricksInstance
from streamsets.testframework.markers import cluster
os.environ['DATABRICKS_TOKEN'] ='<your_token>'

@cluster('databricks')
def test_databricks_compare_schema(databricks_cluster, sch, pipeline):
    pipeline.configuration['shouldRetry'] = False
    #Runs pipeline preview
    preview = sch.run_pipeline_preview(pipeline).preview
    origin=pipeline.stages[0]
    destination=pipeline.stages[1]
    prev_data = preview[origin.instance_name].output
    origin_schema = list(prev_data[1].field.keys())

    db_fs = databricks_cluster.fs
    path = destination.configuration['conf.deltaLakeTable']
    #checking if files exists
    list_files = db_fs.list_files(path)
    if len(list_files) > 0:
        file = list_files[0]
        file_data = db_fs.read_raw(file)
        reader = pa.BufferReader(file_data)
        table = pq.read_table(reader)
        df = table.to_pandas()
        dest_schema = df.columns.tolist()
        # assertion on schema comparison between origin and already existing files in databricks.
        assert origin_schema == dest_schema’
        print(’---Schema Matched---’)

@pytest.fixture(scope='function')
def databricks_cluster(cluster):
    yield cluster
				
			
Then use this code to run the test:
				
					stf --docker-image-dont-pull --docker-image streamsets/testframework:<docker_image_name>  test --databricks-instance-url <databricks-instance-url> --sch-server-url 'https://cloud.streamsets.com/' --sch-username '<user_name>' --sch-password '<password>' --pipeline-name <your_pipeline_name> -vs <filename.py>
				
			

Our sincere hope is that you have a better understanding of how to build and operate within the StreamSets Test Framework. 

Share on linkedin
Share on twitter
Share on facebook
Share on email

Table of Contents

More to explore

Dependable data products, delivered faster.

Snowflake Onboarding Accelerator

Infrastructure-as-code Accelerator

Snowflake Account Visualization and Auditing

Operational Monitoring and Observability Accelerator

SaaS SQL Translator