Comprehensive Data Validation Pipeline
Validate data completeness, accuracy, and consistency
from seerpy import Seer
import pandas as pd
from datetime import datetime
seer = Seer(apiKey='YOUR_API_KEY')
class DataValidator:
def __init__(self, df):
self.df = df
self.errors = []
self.warnings = []
def check_completeness(self, required_columns):
"""Check for missing required columns"""
missing = set(required_columns) - set(self.df.columns)
if missing:
self.errors.append(f"Missing required columns: {missing}")
return len(missing) == 0
def check_nulls(self, columns, threshold=0.05):
"""Check for excessive null values"""
for col in columns:
null_pct = self.df[col].isnull().sum() / len(self.df)
if null_pct > threshold:
self.warnings.append(
f"Column '{col}' has {null_pct*100:.1f}% null values"
)
def check_duplicates(self, subset=None):
"""Check for duplicate records"""
dupes = self.df.duplicated(subset=subset).sum()
if dupes > 0:
self.warnings.append(f"Found {dupes} duplicate records")
return dupes
def check_data_types(self, type_map):
"""Validate column data types"""
for col, expected_type in type_map.items():
if col in self.df.columns:
actual_type = self.df[col].dtype
if str(actual_type) != expected_type:
self.errors.append(
f"Column '{col}' has type {actual_type}, expected {expected_type}"
)
def check_ranges(self, range_map):
"""Check numeric values are within expected ranges"""
for col, (min_val, max_val) in range_map.items():
if col in self.df.columns:
out_of_range = (
(self.df[col] < min_val) | (self.df[col] > max_val)
).sum()
if out_of_range > 0:
self.warnings.append(
f"Column '{col}' has {out_of_range} values outside range [{min_val}, {max_val}]"
)
def check_dates(self, date_columns):
"""Validate date columns"""
for col in date_columns:
if col in self.df.columns:
# Check for future dates
future_dates = (self.df[col] > pd.Timestamp.now()).sum()
if future_dates > 0:
self.warnings.append(
f"Column '{col}' has {future_dates} future dates"
)
def generate_report(self):
"""Generate validation report"""
report = {
'total_records': len(self.df),
'total_columns': len(self.df.columns),
'errors': self.errors,
'warnings': self.warnings,
'passed': len(self.errors) == 0
}
return report
def validate_sales_data(df):
"""Validate sales data with specific business rules"""
validator = DataValidator(df)
# Check required columns
required_cols = ['order_id', 'customer_id', 'amount', 'date']
validator.check_completeness(required_cols)
# Check for nulls
validator.check_nulls(['customer_id', 'amount', 'date'])
# Check duplicates
validator.check_duplicates(subset=['order_id'])
# Check data types
type_map = {
'order_id': 'int64',
'amount': 'float64',
'date': 'datetime64[ns]'
}
validator.check_data_types(type_map)
# Check value ranges
range_map = {
'amount': (0, 1000000), # Amount between $0 and $1M
'quantity': (1, 1000) # Quantity between 1 and 1000
}
validator.check_ranges(range_map)
# Check dates
validator.check_dates(['date'])
return validator.generate_report()
# Monitor validation process
with seer.monitor("daily-data-validation", capture_logs=True):
# Load data
df = pd.read_csv('daily_sales.csv')
print(f"Loaded {len(df)} records")
# Validate data
report = validate_sales_data(df)
# Log results
print(f"Validation Results:")
print(f" Total Records: {report['total_records']}")
print(f" Errors: {len(report['errors'])}")
print(f" Warnings: {len(report['warnings'])}")
if report['errors']:
print("\nErrors:")
for error in report['errors']:
print(f" - {error}")
if report['warnings']:
print("\nWarnings:")
for warning in report['warnings']:
print(f" - {warning}")
# Fail if there are errors
if not report['passed']:
raise ValueError(f"Data validation failed with {len(report['errors'])} errors")
print("\n✓ Data validation passed")
Using Great Expectations with SEER
Advanced data validation framework
from seerpy import Seer
import great_expectations as ge
import pandas as pd
seer = Seer(apiKey='YOUR_API_KEY')
with seer.monitor("ge-validation", capture_logs=True):
# Load data into Great Expectations DataFrame
df = ge.read_csv('customer_data.csv')
# Define expectations
df.expect_table_row_count_to_be_between(min_value=100, max_value=1000000)
df.expect_column_values_to_not_be_null('customer_id')
df.expect_column_values_to_be_unique('customer_id')
df.expect_column_values_to_be_in_set('status', ['active', 'inactive', 'pending'])
df.expect_column_values_to_match_regex('email', r'^[\w\.-]+@[\w\.-]+\.\w+$')
df.expect_column_mean_to_be_between('age', min_value=18, max_value=100)
# Validate
results = df.validate()
# Log results
print(f"Validation Results:")
print(f" Success: {results['success']}")
print(f" Evaluated Expectations: {results['statistics']['evaluated_expectations']}")
print(f" Successful Expectations: {results['statistics']['successful_expectations']}")
print(f" Failed Expectations: {results['statistics']['unsuccessful_expectations']}")
# Fail if validation failed
if not results['success']:
failed = [r for r in results['results'] if not r['success']]
for failure in failed:
print(f" Failed: {failure['expectation_config']['expectation_type']}")
raise ValueError("Data validation failed")
print("✓ All expectations met")
Schema Validation
Validate data schema matches expected structure
from seerpy import Seer
import pandas as pd
from pandera import Column, DataFrameSchema, Check
seer = Seer(apiKey='YOUR_API_KEY')
# Define schema
schema = DataFrameSchema({
"order_id": Column(int, Check.greater_than(0)),
"customer_id": Column(str, Check.str_length(min_value=1, max_value=50)),
"amount": Column(float, Check.in_range(min_value=0, max_value=100000)),
"quantity": Column(int, Check.greater_than_equal_to(1)),
"status": Column(str, Check.isin(['pending', 'shipped', 'delivered'])),
"order_date": Column(pd.DatetimeTZDtype(tz="UTC"))
})
with seer.monitor("schema-validation", capture_logs=True):
# Load data
df = pd.read_csv('orders.csv')
df['order_date'] = pd.to_datetime(df['order_date'], utc=True)
# Validate against schema
try:
validated_df = schema.validate(df, lazy=True)
print(f"✓ Schema validation passed for {len(validated_df)} records")
except Exception as e:
print(f"✗ Schema validation failed:")
print(str(e))
raise