Monitor data extraction, transformation, and loading with pandas
from seerpy import Seer
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
from datetime import datetime
# Initialize SEER
seer = Seer(apiKey='YOUR_API_KEY')
# Database connections
SOURCE_DB = create_engine('postgresql://user:pass@source-db:5432/data')
WAREHOUSE_DB = create_engine('postgresql://user:pass@warehouse:5432/analytics')
def extract_data():
"""Extract data from multiple sources"""
# Read from CSV
sales = pd.read_csv('sales_data.csv')
# Read from database
customers = pd.read_sql('SELECT * FROM customers WHERE active = true', SOURCE_DB)
products = pd.read_sql('SELECT * FROM products', SOURCE_DB)
print(f"Extracted {len(sales)} sales records")
print(f"Extracted {len(customers)} customers")
print(f"Extracted {len(products)} products")
return sales, customers, products
def transform_data(sales, customers, products):
"""Transform and enrich data"""
# Merge datasets
enriched = sales.merge(customers, on='customer_id', how='left')
enriched = enriched.merge(products, on='product_id', how='left')
# Data cleaning
enriched['sale_date'] = pd.to_datetime(enriched['sale_date'])
enriched['total_amount'] = enriched['quantity'] * enriched['unit_price']
# Handle missing values
enriched['customer_name'].fillna('Unknown', inplace=True)
# Remove duplicates
enriched.drop_duplicates(subset=['order_id'], inplace=True)
# Add computed columns
enriched['month'] = enriched['sale_date'].dt.to_period('M')
enriched['year'] = enriched['sale_date'].dt.year
# Create aggregations
monthly_summary = enriched.groupby(['year', 'month', 'product_id']).agg({
'order_id': 'count',
'total_amount': 'sum',
'quantity': 'sum'
}).reset_index()
monthly_summary.columns = ['year', 'month', 'product_id', 'order_count', 'revenue', 'units_sold']
print(f"Transformed to {len(enriched)} enriched records")
print(f"Created {len(monthly_summary)} summary records")
return enriched, monthly_summary
def load_data(enriched, summary):
"""Load data to warehouse"""
# Load enriched data
enriched.to_sql('sales_enriched', WAREHOUSE_DB, if_exists='append', index=False)
# Load summary data
summary.to_sql('monthly_sales_summary', WAREHOUSE_DB, if_exists='append', index=False)
print(f"Loaded {len(enriched)} records to sales_enriched")
print(f"Loaded {len(summary)} records to monthly_sales_summary")
def validate_data(enriched):
"""Data quality checks"""
issues = []
# Check for nulls in critical columns
if enriched['customer_id'].isnull().any():
issues.append(f"{enriched['customer_id'].isnull().sum()} null customer IDs")
# Check for negative amounts
if (enriched['total_amount'] < 0).any():
issues.append(f"{(enriched['total_amount'] < 0).sum()} negative amounts")
# Check for future dates
if (enriched['sale_date'] > datetime.now()).any():
issues.append(f"{(enriched['sale_date'] > datetime.now()).sum()} future dates")
if issues:
raise ValueError(f"Data quality issues: {', '.join(issues)}")
print("Data validation passed")
# Main ETL process with monitoring
with seer.monitor("daily-etl-pipeline",
capture_logs=True,
metadata={
"run_date": str(datetime.now().date()),
"environment": "production"
}):
# Extract phase
sales, customers, products = extract_data()
# Transform phase
enriched, summary = transform_data(sales, customers, products)
# Validate phase
validate_data(enriched)
# Load phase
load_data(enriched, summary)
print(f"ETL pipeline completed successfully at {datetime.now()}")
from seerpy import Seer
import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import pickle
import os
seer = Seer(apiKey='YOUR_API_KEY')
CHECKPOINT_FILE = 'etl_checkpoint.pkl'
def load_checkpoint():
"""Load last successful run timestamp"""
if os.path.exists(CHECKPOINT_FILE):
with open(CHECKPOINT_FILE, 'rb') as f:
return pickle.load(f)
return datetime.now() - timedelta(days=30) # Default: 30 days back
def save_checkpoint(timestamp):
"""Save successful run timestamp"""
with open(CHECKPOINT_FILE, 'wb') as f:
pickle.dump(timestamp, f)
with seer.monitor("incremental-etl", capture_logs=True):
last_run = load_checkpoint()
current_run = datetime.now()
# Extract only new records
query = f"""
SELECT * FROM transactions
WHERE created_at > '{last_run}'
AND created_at <= '{current_run}'
"""
df = pd.read_sql(query, engine)
print(f"Processing {len(df)} new records since {last_run}")
# Transform and load
if len(df) > 0:
# Your transformation logic here
transformed = df.copy()
# Load to warehouse
transformed.to_sql('transactions_fact', warehouse_engine, if_exists='append')
# Save checkpoint only if successful
save_checkpoint(current_run)
print(f"Checkpoint saved: {current_run}")
else:
print("No new records to process")
from seerpy import Seer
import pandas as pd
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
seer = Seer(apiKey='YOUR_API_KEY')
@retry(stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10))
def extract_with_retry(source_url):
"""Extract data with automatic retries"""
try:
df = pd.read_csv(source_url)
return df
except Exception as e:
logging.error(f"Extract failed: {e}")
raise
with seer.monitor("etl-with-retries", capture_logs=True):
try:
# Attempt extraction with retries
data = extract_with_retry('https://api.example.com/data.csv')
# Process data
processed = data.dropna().drop_duplicates()
# Load to destination
processed.to_sql('staging_table', engine, if_exists='replace')
print(f"Successfully processed {len(processed)} records")
except Exception as e:
# SEER automatically captures the error
logging.error(f"ETL failed after retries: {e}")
raise # Re-raise so SEER marks job as failed