Continuous Integration for Streaming jobs

Arpit Samdani, Mohit Ritolia and Vikash Pandey
Arpit Samdani, Mohit Ritolia and Vikash Pandey
10 min read
Posted on April 10, 2023
Continuous Integration for Streaming jobs

CI/CD for streaming jobs can be achieved by using tools like Harness and Jenkins to automate the build and test phases and automate deployment to a streaming data processing system such as Apache Flink, Apache Kafka Streams, or Apache Spark Streaming. The goal is to help developers quickly iterate on their code changes and safely deploy them to production with minimal downtime or risk of errors.

The Structured Streaming Architecture

The first entry point of data in the architecture is the EventHub which is consumed by the Spark Streaming job and is written in the form of a Delta Lake table.

Spark Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. The Spark engine will run it incrementally, do the transformation, join all the data from all the sources continuously, and update the final result as streaming data continues to arrive.

Delta Lake

Delta Lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. We can store our data the way it is, without having to structure it first and run different types of analytics from dashboards and visualization to big-data processing and real-time analytics.

Regression

Regression testing ensures that the best version of your product is released. Considering continuous deployment with Spark streaming, it's very important to have a high confidence in the job and ensure the measure calculated/generated are as expected for all the different source streams with the transformed data.

Why is Regression Required?

  • It increases the chance of detecting bugs caused by changes and also makes sure all the output generated from the input set matches the actual result.

  • It can help catch defects early and thus reduce the cost to resolve them.

  • Helps in researching unwanted side effects that might have occurred due to a new operating environment.

  • Ensures better performing software due to early identification of bugs and errors.
    Most importantly, it verifies that code changes do not re-introduce old defects.

Regression Components and Architectur

Let’s take a look at the different components.

Test Case Generator

Its purpose is to generate the test case that enriches all the dimensions based on the test case input CSV and serialize it into a thrift message. it also stores the input and output in a Local Cache Map.

public static BilledEvent generateBillingInput(String[] csvRecord) throws Exception {
        BilledEvent billedEvent = new BilledEvent();
        RequestCommonDimensions requestCommonDimensions = new RequestCommonDimensions();
        ImpressionCommonDimensions impressionCommonDimensions = new ImpressionCommonDimensions();
        BidCommonReportingDimensions bidCommonReportingDimensions = new BidCommonReportingDimensions();
       
         if (null != csvRecord[POSTSERVE_EVENT_TIME]) {
            billedEvent.setEventArrivalTime(Long.parseLong(csvRecord[POSTSERVE_EVENT_TIME]));
        }

         if (null != csvRecord[POSTSERVE_REQUESTCOMMONDIMENSIONS_FLAG]) {
            if(csvRecord[POSTSERVE_REQUESTCOMMONDIMENSIONS_FLAG].equals(TRUE)) {
                billedEvent.setRequestCommonDimensions(ThriftEventParser.serializeEvent(requestCommonDimensions));
            } else {
                if (null != csvRecord[POSTSERVE_PUBLISHER_ID] && !csvRecord[POSTSERVE_PUBLISHER_ID].isEmpty()) {
                    requestCommonDimensions.setPublisherId(csvRecord[POSTSERVE_PUBLISHER_ID]);
                }

                if (null != csvRecord[POSTSERVE_DEVICE_TYPE] && !csvRecord[POSTSERVE_DEVICE_TYPE].isEmpty()) {
                    requestCommonDimensions.setDeviceType(Short.parseShort(csvRecord[POSTSERVE_DEVICE_TYPE]));
                }

                if (null != csvRecord[POSTSERVE_UMP_HOST_SLOT] && !csvRecord[POSTSERVE_UMP_HOST_SLOT].isEmpty()) {
                    requestCommonDimensions.setUmpHostSlot((byte) Integer.parseInt(csvRecord[POSTSERVE_UMP_HOST_SLOT]));
                }

                if (null != csvRecord[POSTSERVE_DC] && !csvRecord[POSTSERVE_DC].isEmpty()) {
                    requestCommonDimensions.setDc(DataCentre.findByValue(Integer.parseInt(csvRecord[POSTSERVE_DC])));
                }
                billedEvent.setRequestCommonDimensions(ThriftEventParser.serializeEvent(requestCommonDimensions));
            }
        }

EventHub Client Sink Factory

It creates the event client based on the input configuration.

 public static CompletableFuture createEventHubClient(String configSubSet) throws Exception {
        EventHubSinkConfig eventHubRegression = (new EventHubSinkUtil(configSubSet)).getSinkConfig(configSubSet);
        String connectionString = EventHubSinkUtil.getConnectionString(eventHubRegression);
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
        CompletableFuture eventHubClient = EventHubClient.createFromConnectionString(connectionString, pool);
        return eventHubClient;
    }

Delta Table Generator and Validator

It generates a Delta table for the regression run which a validator can query upon to get the output.

Thereafter, the validator validates on the basis of the query result from the Delta Lake and the local cache map which was generated while generating the test case.

On the validator result, it generates a success or a failure file.

 public static void generateLogFile(boolean success, String CONFIG_PATH, String regressionRunUUID) {

        Configuration config = ConfigurationUtil.getConfig(CONFIG_PATH);
        StringBuilder regressionLogPath= new StringBuilder(config.getString(DatagenConstants.REGRESSION_LOG_PATH_CONFIG) + regressionRunUUID + "/");
        StringBuilder regressionFileName=new StringBuilder();
        if(success){
            regressionFileName= regressionFileName.append(regressionLogPath).append(DatagenConstants.SUCCESS_FILE_NAME);
        }
        else{
            regressionFileName=regressionFileName.append(regressionLogPath).append(DatagenConstants.FAILED_FILE_NAME);
        }
        DBUtilsHolder.dbutils().fs().put(regressionFileName.toString(),"",true);
        log.info("Regression File Name {}", regressionFileName);
       
        }

CI/CD Integration

Releasing software isn’t just an art, it’s an engineering discipline too.

Continuous Integration

Once the code written by the developers are checked and passed, the software is essentially ready for deployment. We have Integrated the pipeline in Harness with Databricks to run the regression for the streaming job.


Regression Job Harness Pipeline

Step 1: Configuration and Regression Run ID Generation

  • Generate the Regression Run ID

  • Fetch Regression Configuration and infra configuration

Step 2: Regression Databricks Cluster Creation

  • Prepare cluster creation request body.

  • Create a Databricks cluster.

  • Verify the cluster creation.

Step 3: Regression Databricks Job Creation

  • Prepare Regression Job Request Body

  • Create Regression Workflow Job on Databricks.

  • Check the Regression Status from the storage account for success/failure file for the Regression Run ID.

Step 4: Regression Resources Cleanup

  • Delete regression job.

  • Delete regression cluster.

Databricks Cluster APIs

Create a Cluster

2.0/clusters/create

POST

Verify a Cluster

2.0/clusters/get

GET

2.0/clusters/delete

POST

 

Databricks Jobs APIs

Create a Job

2.0/jobs/create

POST

Run a Job

2.0/jobs/run-now

POST

Delete Job

2.0/jobs/delete

POST

Final Thoughts

  • With the regression and CI/CD in place, we are able to validate and deploy the Streaming Job on Databricks in less than 5 minutes.

  • With regression in place, we will be able to validate the Streaming Job that doesn’t require the featured pilot for the Streaming Jobs to be in place.

With automation, developers can release new features and change more frequently, while teams will have better overall stability.

I hope after reading this article you may have a fair understanding of the streaming job and its regression and integration with Harness for the CI/CD pipeline.