Techno Blender
Digitally Yours.

How to Test PySpark ETL Data Pipeline | by Edwin Tan | Dec, 2022

0 40


Validate big data pipeline with Great Expectations

Photo by Erlend Ekseth on Unsplash

Garbage in garbage out is a common expression used to emphasize the importance of data quality for tasks such as machine learning, data analytics and business intelligence. With increasing amount of data being created and stored, building high quality data pipelines have never been more challenging.

PySpark is a commonly used tool to build ETL pipelines for large datasets. A common question that arises while building data pipeline is “How do we know that our data pipeline is transforming the data in the way that is intended?”. To answer this question, we borrow the idea of unit test from the software development paradigm. The purpose of unit test is to validate that each component of the code performs as expected by using a test to check if output meets expectation. In similar fashion, we can validate if our data pipeline is working as intended by writing a test to check the output data.

In this section we will walk through an example of how to leverage on Great Expectation to validate your PySpark data pipeline.

This example uses the following setup:

  1. PySpark
  2. Great Expectations==0.15.34
  3. Databricks notebook

We will be using Databricks notebook in Databricks community edition. However, you are free to use any Integrated Development Environment and cloud or local spark cluster.

We are using the UCI bank marketing dataset[2] which contains information of a direct marketing campaign of a Portuguese bank. Here’s how the data looks like.

Image by author.

Understanding of the data is paramount to create comprehensive test for your PySpark data pipeline. Common considerations when it comes to data quality includes but not limited to:

  • Completeness
  • Consistency
  • Correctness
  • Uniqueness

The acceptable quality for each of the above mentioned items depends heavily on the context and use case.

In this section, we explore and create expectations which will be used in later section to test on new unseen data.

So what are expectations? Expectations are assertions of the data. As the name implies, we are validating if the data is what we are expecting it to be. Great Expectations comes with predefined expectations for common data quality checks. Here are some examples of predefined expectations.

expect_column_values_to_be_not_null
expect_column_values_tpytho_be_unique
expect_column_values_to_match_regex
expect_column_median_to_be_between
expect_table_row_count_to_be_between

The name of these expectations are rather descriptive of what the expectations perform. If predefined expectations does not fit your needs, Great Expectations also allow you to create custom expectations.

Imports

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql import functions as f, Window
import json

Load the data

We load the data from a csv file and performed some processing steps on the data set:

  • Change the “unknown” value in job column to “null”
  • Create an “id” column which contains unique identifier for each row
df = \\
spark\\
.read\\
.format('csv')\\
.option('header', True)\\
.load('/FileStore/tables/bank_full.csv', sep = ';')\\
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).otherwise(f.col('job')))\\
.withColumn('id', f.monotonically_increasing_id())

SparkDFDataset is a thin wrapper around PySpark DataFrame which allows us to use Great Expectation methods on Pyspark DataFrame.

gdf = SparkDFDataset(df)

Check column name

Let’s validate if the DataFrame contains the correct set of columns by providing the list of expected columns to the expect_table_columns_to_match_set method.

expected_columns = ['age', 'job', 'marital',
'education', 'default', 'balance',
'housing', 'loan', 'contact',
'day', 'month', 'duration',
'campaign', 'pdays', 'previous',
'poutcome', 'y']
gdf.expect_table_columns_to_match_set(column_set = expected_columns)

Running the code above returns the below output. "success":trueindicates that the test has passed.

# output
{
"result": {
"observed_value": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Check values in categorical column

We can check if a categorical column contains unexpected data by using the expect_column_values_to_be_in_setmethod. We expect the maritalcolumn to contain only the following values single, married and divorced.

gdf.expect_column_values_to_be_in_set(column = 'marital', value_set = {'single', 'married', 'divorced'})

Great Expectation will fail the check if marital column contains any values that are not found in the value set.

# output
{
"result": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Check column does not contain null

If we are expecting columns to not contain any null values, we can use the expect_column_values_to_not_be_null method and specify the column of interest in the argument.

gdf.expect_column_values_to_not_be_null(column = 'job')

In this case, the job column fails the check as there are null values in the column.

{
"result": {
"element_count": 45211,
"unexpected_count": 288,
"unexpected_percent": 0.6370131162770122,
"unexpected_percent_total": 0.6370131162770122,
"partial_unexpected_list": [
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": false
}

Check uniqueness in column

Great Expectation also provides out of the box method to check that the values in a given column are unique. Let’s check if the id column contains unique values.

gdf.expect_column_values_to_be_unique('id')

As we expected, the column contains unique values.

# output
{
"result": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Now that we have created various expectations, we can put them all together in an expectation suite.

expectation_suite = gdf.get_expectation_suite(discard_failed_expectations=False)

The expectation quite is nothing more than just a collection of expectations.

#expectation_suite
{
"expectations": [
{
"kwargs": {
"column_set": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"expectation_type": "expect_table_columns_to_match_set",
"meta": {}
},
{
"kwargs": {
"column": "marital",
"value_set": [
"married",
"divorced",
"single"
]
},
"expectation_type": "expect_column_values_to_be_in_set",
"meta": {}
},
{
"kwargs": {
"column": "job"
},
"expectation_type": "expect_column_values_to_not_be_null",
"meta": {}
},
{
"kwargs": {
"column": "id"
},
"expectation_type": "expect_column_values_to_be_unique",
"meta": {}
}
],
"data_asset_type": "Dataset",
"meta": {
"great_expectations_version": "0.15.34"
},
"expectation_suite_name": "default",
"ge_cloud_id": null
}

Let’s save the expectation suite in JSON format.

# save expectation suite
with open('my_expectation_suite.json', 'w') as my_file:
my_file.write(
json.dumps(expectation_suite.to_json_dict())
)

Assuming that we have a new set of data and it requires checking. We use the previously saved expectation suite to perform the data quality check on unseen data in a new notebook.

Import

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import pyspark
from pyspark.sql import functions as f, Window
import json

Load the data

df = \\
spark\\
.read\\
.format('csv')\\
.option('header', True)\\
.load('/FileStore/tables/bank_full_new.csv', sep = ';')\\
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).otherwise(f.col('job')))\\
.withColumn('id', f.monotonically_increasing_id())

We create two functions to (a) load the expectation suite and (b) validate the data against the expectation suite.

def load_expectation_suite(path: str) -> dict:
"""Load expectation suite stored in JSON format
and convert into dictionary.
Args:
path (str): path to expectation suite json file
Returns:
dict: expectation suite
"""
with open(path, 'r') as f:
expectation_suite = json.load(f)
return expectation_suite

def great_expectation_validation(df: pyspark.sql.DataFrame,
expectation_suite_path: str) -> dict:
"""Run validation on DataFrame based on expecation suite
Args:
df (pyspark.sql.DataFrame): DataFrame to validate
expectation_suite_path (str): path to expectation suite json file
Returns:
dict: Validation result
"""
expectation_suite = load_expectation_suite(expectation_suite_path)
gdf = SparkDFDataset(df)
validation_results = gdf.validate(expectation_suite = expectation_suite, result_format = 'SUMMARY', catch_exceptions = True)
return validation_results

Run the validation

validation_result = \\
great_expectation_validation(df = df,
expectation_suite_path = 'my_expectation_suite.json')

Perform the check

False indicates that the validation failed as we have failed at least one expectation in the expectation suite.

validation_result['success']

# output:
# False

Great Expectation also shows the number of successful and failed test.

validation_result['statistics']

#output:
#{'evaluated_expectations': 4,
# 'successful_expectations': 2,
# 'unsuccessful_expectations': 2,
# 'success_percent': 50.0}

In this article we discussed the importance of testing your data pipeline and walked through and example of how we can leverage on Great Expectation to create a comprehensive suite of tests.

[1] Great Expectations Home Page • Great Expectations

[2] Moro,S., Rita,P. & Cortez,P.. (2012). Bank Marketing. UCI Machine Learning Repository. CC By 4.0.


Validate big data pipeline with Great Expectations

Photo by Erlend Ekseth on Unsplash

Garbage in garbage out is a common expression used to emphasize the importance of data quality for tasks such as machine learning, data analytics and business intelligence. With increasing amount of data being created and stored, building high quality data pipelines have never been more challenging.

PySpark is a commonly used tool to build ETL pipelines for large datasets. A common question that arises while building data pipeline is “How do we know that our data pipeline is transforming the data in the way that is intended?”. To answer this question, we borrow the idea of unit test from the software development paradigm. The purpose of unit test is to validate that each component of the code performs as expected by using a test to check if output meets expectation. In similar fashion, we can validate if our data pipeline is working as intended by writing a test to check the output data.

In this section we will walk through an example of how to leverage on Great Expectation to validate your PySpark data pipeline.

This example uses the following setup:

  1. PySpark
  2. Great Expectations==0.15.34
  3. Databricks notebook

We will be using Databricks notebook in Databricks community edition. However, you are free to use any Integrated Development Environment and cloud or local spark cluster.

We are using the UCI bank marketing dataset[2] which contains information of a direct marketing campaign of a Portuguese bank. Here’s how the data looks like.

Image by author.

Understanding of the data is paramount to create comprehensive test for your PySpark data pipeline. Common considerations when it comes to data quality includes but not limited to:

  • Completeness
  • Consistency
  • Correctness
  • Uniqueness

The acceptable quality for each of the above mentioned items depends heavily on the context and use case.

In this section, we explore and create expectations which will be used in later section to test on new unseen data.

So what are expectations? Expectations are assertions of the data. As the name implies, we are validating if the data is what we are expecting it to be. Great Expectations comes with predefined expectations for common data quality checks. Here are some examples of predefined expectations.

expect_column_values_to_be_not_null
expect_column_values_tpytho_be_unique
expect_column_values_to_match_regex
expect_column_median_to_be_between
expect_table_row_count_to_be_between

The name of these expectations are rather descriptive of what the expectations perform. If predefined expectations does not fit your needs, Great Expectations also allow you to create custom expectations.

Imports

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
from pyspark.sql import functions as f, Window
import json

Load the data

We load the data from a csv file and performed some processing steps on the data set:

  • Change the “unknown” value in job column to “null”
  • Create an “id” column which contains unique identifier for each row
df = \\
spark\\
.read\\
.format('csv')\\
.option('header', True)\\
.load('/FileStore/tables/bank_full.csv', sep = ';')\\
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).otherwise(f.col('job')))\\
.withColumn('id', f.monotonically_increasing_id())

SparkDFDataset is a thin wrapper around PySpark DataFrame which allows us to use Great Expectation methods on Pyspark DataFrame.

gdf = SparkDFDataset(df)

Check column name

Let’s validate if the DataFrame contains the correct set of columns by providing the list of expected columns to the expect_table_columns_to_match_set method.

expected_columns = ['age', 'job', 'marital',
'education', 'default', 'balance',
'housing', 'loan', 'contact',
'day', 'month', 'duration',
'campaign', 'pdays', 'previous',
'poutcome', 'y']
gdf.expect_table_columns_to_match_set(column_set = expected_columns)

Running the code above returns the below output. "success":trueindicates that the test has passed.

# output
{
"result": {
"observed_value": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Check values in categorical column

We can check if a categorical column contains unexpected data by using the expect_column_values_to_be_in_setmethod. We expect the maritalcolumn to contain only the following values single, married and divorced.

gdf.expect_column_values_to_be_in_set(column = 'marital', value_set = {'single', 'married', 'divorced'})

Great Expectation will fail the check if marital column contains any values that are not found in the value set.

# output
{
"result": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Check column does not contain null

If we are expecting columns to not contain any null values, we can use the expect_column_values_to_not_be_null method and specify the column of interest in the argument.

gdf.expect_column_values_to_not_be_null(column = 'job')

In this case, the job column fails the check as there are null values in the column.

{
"result": {
"element_count": 45211,
"unexpected_count": 288,
"unexpected_percent": 0.6370131162770122,
"unexpected_percent_total": 0.6370131162770122,
"partial_unexpected_list": [
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
]
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": false
}

Check uniqueness in column

Great Expectation also provides out of the box method to check that the values in a given column are unique. Let’s check if the id column contains unique values.

gdf.expect_column_values_to_be_unique('id')

As we expected, the column contains unique values.

# output
{
"result": {
"element_count": 45211,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 0,
"unexpected_percent": 0.0,
"unexpected_percent_total": 0.0,
"unexpected_percent_nonmissing": 0.0,
"partial_unexpected_list": []
},
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"meta": {},
"success": true
}

Now that we have created various expectations, we can put them all together in an expectation suite.

expectation_suite = gdf.get_expectation_suite(discard_failed_expectations=False)

The expectation quite is nothing more than just a collection of expectations.

#expectation_suite
{
"expectations": [
{
"kwargs": {
"column_set": [
"age",
"job",
"marital",
"education",
"default",
"balance",
"housing",
"loan",
"contact",
"day",
"month",
"duration",
"campaign",
"pdays",
"previous",
"poutcome",
"y"
]
},
"expectation_type": "expect_table_columns_to_match_set",
"meta": {}
},
{
"kwargs": {
"column": "marital",
"value_set": [
"married",
"divorced",
"single"
]
},
"expectation_type": "expect_column_values_to_be_in_set",
"meta": {}
},
{
"kwargs": {
"column": "job"
},
"expectation_type": "expect_column_values_to_not_be_null",
"meta": {}
},
{
"kwargs": {
"column": "id"
},
"expectation_type": "expect_column_values_to_be_unique",
"meta": {}
}
],
"data_asset_type": "Dataset",
"meta": {
"great_expectations_version": "0.15.34"
},
"expectation_suite_name": "default",
"ge_cloud_id": null
}

Let’s save the expectation suite in JSON format.

# save expectation suite
with open('my_expectation_suite.json', 'w') as my_file:
my_file.write(
json.dumps(expectation_suite.to_json_dict())
)

Assuming that we have a new set of data and it requires checking. We use the previously saved expectation suite to perform the data quality check on unseen data in a new notebook.

Import

import great_expectations as ge
from great_expectations.dataset.sparkdf_dataset import SparkDFDataset
import pyspark
from pyspark.sql import functions as f, Window
import json

Load the data

df = \\
spark\\
.read\\
.format('csv')\\
.option('header', True)\\
.load('/FileStore/tables/bank_full_new.csv', sep = ';')\\
.withColumn('job', f.when(f.col('job') == 'unknown', f.lit(None)).otherwise(f.col('job')))\\
.withColumn('id', f.monotonically_increasing_id())

We create two functions to (a) load the expectation suite and (b) validate the data against the expectation suite.

def load_expectation_suite(path: str) -> dict:
"""Load expectation suite stored in JSON format
and convert into dictionary.
Args:
path (str): path to expectation suite json file
Returns:
dict: expectation suite
"""
with open(path, 'r') as f:
expectation_suite = json.load(f)
return expectation_suite

def great_expectation_validation(df: pyspark.sql.DataFrame,
expectation_suite_path: str) -> dict:
"""Run validation on DataFrame based on expecation suite
Args:
df (pyspark.sql.DataFrame): DataFrame to validate
expectation_suite_path (str): path to expectation suite json file
Returns:
dict: Validation result
"""
expectation_suite = load_expectation_suite(expectation_suite_path)
gdf = SparkDFDataset(df)
validation_results = gdf.validate(expectation_suite = expectation_suite, result_format = 'SUMMARY', catch_exceptions = True)
return validation_results

Run the validation

validation_result = \\
great_expectation_validation(df = df,
expectation_suite_path = 'my_expectation_suite.json')

Perform the check

False indicates that the validation failed as we have failed at least one expectation in the expectation suite.

validation_result['success']

# output:
# False

Great Expectation also shows the number of successful and failed test.

validation_result['statistics']

#output:
#{'evaluated_expectations': 4,
# 'successful_expectations': 2,
# 'unsuccessful_expectations': 2,
# 'success_percent': 50.0}

In this article we discussed the importance of testing your data pipeline and walked through and example of how we can leverage on Great Expectation to create a comprehensive suite of tests.

[1] Great Expectations Home Page • Great Expectations

[2] Moro,S., Rita,P. & Cortez,P.. (2012). Bank Marketing. UCI Machine Learning Repository. CC By 4.0.

FOLLOW US ON GOOGLE NEWS

Read original article here

Denial of responsibility! Techno Blender is an automatic aggregator of the all world’s media. In each content, the hyperlink to the primary source is specified. All trademarks belong to their rightful owners, all materials to their authors. If you are the owner of the content and do not want us to publish your materials, please contact us by email – [email protected]. The content will be deleted within 24 hours.

Leave a comment