Back to Examples

Pandas ETL Pipeline Monitoring

Monitor data extraction, transformation, and loading with pandas

Complete Example
Full ETL Pipeline with SEER Monitoring
Extract from CSV, transform with pandas, load to database
from seerpy import Seer
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from datetime import datetime

# Initialize SEER
seer = Seer(apiKey='YOUR_API_KEY')

# Database connections
SOURCE_DB = create_engine('postgresql://user:pass@source-db:5432/data')
WAREHOUSE_DB = create_engine('postgresql://user:pass@warehouse:5432/analytics')

def extract_data():
    """Extract data from multiple sources"""
    # Read from CSV
    sales = pd.read_csv('sales_data.csv')
    
    # Read from database
    customers = pd.read_sql('SELECT * FROM customers WHERE active = true', SOURCE_DB)
    products = pd.read_sql('SELECT * FROM products', SOURCE_DB)
    
    print(f"Extracted {len(sales)} sales records")
    print(f"Extracted {len(customers)} customers")
    print(f"Extracted {len(products)} products")
    
    return sales, customers, products

def transform_data(sales, customers, products):
    """Transform and enrich data"""
    # Merge datasets
    enriched = sales.merge(customers, on='customer_id', how='left')
    enriched = enriched.merge(products, on='product_id', how='left')
    
    # Data cleaning
    enriched['sale_date'] = pd.to_datetime(enriched['sale_date'])
    enriched['total_amount'] = enriched['quantity'] * enriched['unit_price']
    
    # Handle missing values
    enriched['customer_name'].fillna('Unknown', inplace=True)
    
    # Remove duplicates
    enriched.drop_duplicates(subset=['order_id'], inplace=True)
    
    # Add computed columns
    enriched['month'] = enriched['sale_date'].dt.to_period('M')
    enriched['year'] = enriched['sale_date'].dt.year
    
    # Create aggregations
    monthly_summary = enriched.groupby(['year', 'month', 'product_id']).agg({
        'order_id': 'count',
        'total_amount': 'sum',
        'quantity': 'sum'
    }).reset_index()
    
    monthly_summary.columns = ['year', 'month', 'product_id', 'order_count', 'revenue', 'units_sold']
    
    print(f"Transformed to {len(enriched)} enriched records")
    print(f"Created {len(monthly_summary)} summary records")
    
    return enriched, monthly_summary

def load_data(enriched, summary):
    """Load data to warehouse"""
    # Load enriched data
    enriched.to_sql('sales_enriched', WAREHOUSE_DB, if_exists='append', index=False)
    
    # Load summary data
    summary.to_sql('monthly_sales_summary', WAREHOUSE_DB, if_exists='append', index=False)
    
    print(f"Loaded {len(enriched)} records to sales_enriched")
    print(f"Loaded {len(summary)} records to monthly_sales_summary")

def validate_data(enriched):
    """Data quality checks"""
    issues = []
    
    # Check for nulls in critical columns
    if enriched['customer_id'].isnull().any():
        issues.append(f"{enriched['customer_id'].isnull().sum()} null customer IDs")
    
    # Check for negative amounts
    if (enriched['total_amount'] < 0).any():
        issues.append(f"{(enriched['total_amount'] < 0).sum()} negative amounts")
    
    # Check for future dates
    if (enriched['sale_date'] > datetime.now()).any():
        issues.append(f"{(enriched['sale_date'] > datetime.now()).sum()} future dates")
    
    if issues:
        raise ValueError(f"Data quality issues: {', '.join(issues)}")
    
    print("Data validation passed")

# Main ETL process with monitoring
with seer.monitor("daily-etl-pipeline", 
                  capture_logs=True,
                  metadata={
                      "run_date": str(datetime.now().date()),
                      "environment": "production"
                  }):
    
    # Extract phase
    sales, customers, products = extract_data()
    
    # Transform phase
    enriched, summary = transform_data(sales, customers, products)
    
    # Validate phase
    validate_data(enriched)
    
    # Load phase
    load_data(enriched, summary)
    
    print(f"ETL pipeline completed successfully at {datetime.now()}")
Incremental ETL with Checkpointing
Process only new records since last run
from seerpy import Seer
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import pickle
import os

seer = Seer(apiKey='YOUR_API_KEY')

CHECKPOINT_FILE = 'etl_checkpoint.pkl'

def load_checkpoint():
    """Load last successful run timestamp"""
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, 'rb') as f:
            return pickle.load(f)
    return datetime.now() - timedelta(days=30)  # Default: 30 days back

def save_checkpoint(timestamp):
    """Save successful run timestamp"""
    with open(CHECKPOINT_FILE, 'wb') as f:
        pickle.dump(timestamp, f)

with seer.monitor("incremental-etl", capture_logs=True):
    last_run = load_checkpoint()
    current_run = datetime.now()
    
    # Extract only new records
    query = f"""
        SELECT * FROM transactions 
        WHERE created_at > '{last_run}'
        AND created_at <= '{current_run}'
    """
    
    df = pd.read_sql(query, engine)
    print(f"Processing {len(df)} new records since {last_run}")
    
    # Transform and load
    if len(df) > 0:
        # Your transformation logic here
        transformed = df.copy()
        
        # Load to warehouse
        transformed.to_sql('transactions_fact', warehouse_engine, if_exists='append')
        
        # Save checkpoint only if successful
        save_checkpoint(current_run)
        print(f"Checkpoint saved: {current_run}")
    else:
        print("No new records to process")
Error Handling and Retry Logic
Robust ETL with automatic retries
from seerpy import Seer
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_exponential
import logging

seer = Seer(apiKey='YOUR_API_KEY')

@retry(stop=stop_after_attempt(3), 
       wait=wait_exponential(multiplier=1, min=4, max=10))
def extract_with_retry(source_url):
    """Extract data with automatic retries"""
    try:
        df = pd.read_csv(source_url)
        return df
    except Exception as e:
        logging.error(f"Extract failed: {e}")
        raise

with seer.monitor("etl-with-retries", capture_logs=True):
    try:
        # Attempt extraction with retries
        data = extract_with_retry('https://api.example.com/data.csv')
        
        # Process data
        processed = data.dropna().drop_duplicates()
        
        # Load to destination
        processed.to_sql('staging_table', engine, if_exists='replace')
        
        print(f"Successfully processed {len(processed)} records")
        
    except Exception as e:
        # SEER automatically captures the error
        logging.error(f"ETL failed after retries: {e}")
        raise  # Re-raise so SEER marks job as failed
Best Practices
  • • Use checkpointing for incremental loads
  • • Implement data quality validation
  • • Add retry logic for network operations
  • • Log row counts at each stage
  • • Use metadata to track run context
  • • Validate data before loading
Common Pitfalls
  • • Not handling missing data properly
  • • Loading duplicate records
  • • No validation before load
  • • Missing error handling
  • • Not tracking data lineage
  • • Forgetting to update checkpoints