Skip to main content

Data Quality Validation - Ensuring Your Data is Trustworthy

· 6 min read
Tom Fynes
Data Engineer @ OptumUK

Bad data leads to bad decisions. As data engineers, one of our most important jobs is ensuring data quality. Let's explore how to validate and maintain high-quality data!

Why Data Quality Matters

Imagine your CEO making a million-dollar decision based on a dashboard... that's pulling from corrupted data. Scary, right? Data quality isn't just a nice-to-have - it's essential for:

  • Accurate analytics and reporting
  • Reliable machine learning models
  • Regulatory compliance
  • Customer trust

The Six Dimensions of Data Quality

When we talk about data quality, we're really talking about six key dimensions:

Accuracy: Is the data correct?
Completeness: Are we missing any data?
Consistency: Does data match across systems?
Timeliness: Is the data up-to-date?
Validity: Does data follow the rules?
Uniqueness: Are there duplicates?

Building Quality Checks with Python

Let's build a data quality framework using pandas and Great Expectations:

import pandas as pd
from datetime import datetime, timedelta

class DataQualityChecker:
"""A simple data quality validation framework"""

def __init__(self, df):
self.df = df
self.issues = []

def check_completeness(self, columns, threshold=0.95):
"""Check if columns have sufficient non-null values"""
for col in columns:
non_null_pct = self.df[col].notna().sum() / len(self.df)
if non_null_pct < threshold:
self.issues.append({
'check': 'completeness',
'column': col,
'message': f'{col} is only {non_null_pct:.1%} complete'
})
return self

def check_uniqueness(self, columns):
"""Check for duplicate values"""
for col in columns:
duplicates = self.df[col].duplicated().sum()
if duplicates > 0:
self.issues.append({
'check': 'uniqueness',
'column': col,
'message': f'{col} has {duplicates} duplicates'
})
return self

def check_range(self, column, min_val, max_val):
"""Check if values are within expected range"""
out_of_range = self.df[
(self.df[column] < min_val) |
(self.df[column] > max_val)
]
if len(out_of_range) > 0:
self.issues.append({
'check': 'range',
'column': column,
'message': f'{len(out_of_range)} values outside range [{min_val}, {max_val}]'
})
return self

def check_freshness(self, timestamp_col, max_age_hours=24):
"""Check if data is recent enough"""
self.df[timestamp_col] = pd.to_datetime(self.df[timestamp_col])
oldest = self.df[timestamp_col].min()
age_hours = (datetime.now() - oldest).total_seconds() / 3600

if age_hours > max_age_hours:
self.issues.append({
'check': 'freshness',
'column': timestamp_col,
'message': f'Data is {age_hours:.1f} hours old (limit: {max_age_hours}h)'
})
return self

def check_pattern(self, column, pattern, description):
"""Check if values match a regex pattern"""
import re
invalid = self.df[~self.df[column].astype(str).str.match(pattern)]
if len(invalid) > 0:
self.issues.append({
'check': 'pattern',
'column': column,
'message': f'{len(invalid)} values don\'t match {description}'
})
return self

def get_report(self):
"""Generate a quality report"""
if not self.issues:
return "✅ All quality checks passed!"

report = f"❌ Found {len(self.issues)} data quality issues:\n\n"
for issue in self.issues:
report += f"- [{issue['check']}] {issue['column']}: {issue['message']}\n"
return report

# Example usage
df = pd.read_csv('customer_data.csv')

quality_report = (
DataQualityChecker(df)
.check_completeness(['email', 'name', 'phone'])
.check_uniqueness(['customer_id', 'email'])
.check_range('age', min_val=0, max_val=120)
.check_pattern('email', r'^[\w\.-]+@[\w\.-]+\.\w+$', 'email format')
.check_freshness('created_at', max_age_hours=48)
.get_report()
)

print(quality_report)

Using Great Expectations

For production systems, Great Expectations is the industry standard:

import great_expectations as gx

# Initialize context
context = gx.get_context()

# Create expectation suite
suite = context.add_expectation_suite("customer_data_suite")

# Add expectations
validator = context.sources.pandas_default.read_csv('customer_data.csv')

# Expect columns to exist
validator.expect_table_columns_to_match_ordered_list([
'customer_id', 'name', 'email', 'age', 'created_at'
])

# Expect no nulls in critical columns
validator.expect_column_values_to_not_be_null('customer_id')
validator.expect_column_values_to_not_be_null('email')

# Expect unique values
validator.expect_column_values_to_be_unique('customer_id')
validator.expect_column_values_to_be_unique('email')

# Expect values in range
validator.expect_column_values_to_be_between(
'age',
min_value=0,
max_value=120
)

# Expect specific format
validator.expect_column_values_to_match_regex(
'email',
regex=r'^[\w\.-]+@[\w\.-]+\.\w+$'
)

# Run validation
results = validator.validate()

if results.success:
print("✅ All expectations met!")
else:
print("❌ Validation failed:")
for result in results.results:
if not result.success:
print(f" - {result.expectation_config.expectation_type}")

SQL-Based Quality Checks

For data in databases, SQL checks are often simpler:

-- Check for completeness
SELECT
COUNT(*) as total_rows,
COUNT(customer_id) as non_null_customer_ids,
COUNT(email) as non_null_emails,
(COUNT(email) * 100.0 / COUNT(*)) as email_completeness_pct
FROM customers;

-- Check for duplicates
SELECT
email,
COUNT(*) as count
FROM customers
GROUP BY email
HAVING COUNT(*) > 1;

-- Check for outliers
SELECT
MIN(age) as min_age,
MAX(age) as max_age,
AVG(age) as avg_age,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY age) as p99_age
FROM customers
WHERE age NOT BETWEEN 0 AND 120;

-- Check for freshness
SELECT
MAX(created_at) as latest_record,
EXTRACT(EPOCH FROM (NOW() - MAX(created_at))) / 3600 as hours_since_last
FROM customers;

-- Check referential integrity
SELECT COUNT(*)
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.customer_id
WHERE c.customer_id IS NULL;

Integrating Quality Checks into Pipelines

Here's how to add quality checks to your Airflow DAG:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator

def validate_data(**context):
"""Run data quality checks"""
df = pd.read_csv('/data/latest.csv')

checker = DataQualityChecker(df)
checker.check_completeness(['id', 'email'])
checker.check_uniqueness(['id'])

if checker.issues:
# Push issues to XCom
context['ti'].xcom_push(key='quality_issues', value=checker.issues)
return 'send_alert' # Branch to alert task
else:
return 'load_to_warehouse' # Continue pipeline

with DAG('etl_with_quality_checks', ...) as dag:

extract = PythonOperator(
task_id='extract_data',
python_callable=extract_data
)

validate = BranchPythonOperator(
task_id='validate_data',
python_callable=validate_data
)

load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_data
)

alert = PythonOperator(
task_id='send_alert',
python_callable=send_quality_alert
)

extract >> validate >> [load, alert]

Monitoring Data Quality Over Time

Track quality metrics to spot trends:

import matplotlib.pyplot as plt

def track_quality_metrics(df, timestamp_col):
"""Track quality metrics over time"""
df[timestamp_col] = pd.to_datetime(df[timestamp_col])
df['date'] = df[timestamp_col].dt.date

# Calculate daily metrics
daily_metrics = df.groupby('date').agg({
'customer_id': 'count',
'email': lambda x: x.notna().sum() / len(x), # Completeness
'age': lambda x: ((x >= 0) & (x <= 120)).sum() / len(x) # Validity
}).rename(columns={
'customer_id': 'record_count',
'email': 'email_completeness',
'age': 'age_validity'
})

return daily_metrics

# Visualize trends
metrics = track_quality_metrics(df, 'created_at')
metrics[['email_completeness', 'age_validity']].plot(
title='Data Quality Trends',
ylabel='Quality Score',
ylim=[0, 1]
)
plt.show()

Best Practices

Start with critical fields: Focus on the data that matters most for your business

Automate everything: Quality checks should run automatically with every pipeline

Set SLAs: Define acceptable thresholds (e.g., "99% completeness on email field")

Alert strategically: Don't cry wolf - alert only on actionable issues

Document expectations: Make it clear what "good data" looks like

# Example: Document expectations
QUALITY_RULES = {
'customer_id': {
'nullable': False,
'unique': True,
'type': 'integer'
},
'email': {
'nullable': False,
'unique': True,
'pattern': r'^[\w\.-]+@[\w\.-]+\.\w+$',
'min_length': 5
},
'age': {
'nullable': True,
'min_value': 0,
'max_value': 120,
'type': 'integer'
}
}

Resources

Wrapping Up

Data quality is not a one-time task - it's an ongoing commitment. Build quality checks into your pipelines from day one, and you'll save yourself countless headaches down the road.