Back to Documentation

Error Handling

Best practices for handling errors and failures in your pipelines

Automatic Error Tracking
SEER automatically captures and reports errors in your monitored code
from seerpy import Seer

seer = Seer(apiKey='YOUR_API_KEY')

# Errors are automatically captured and reported
with seer.monitor("data-processing"):
    data = fetch_data()
    process_data(data)  # If this raises an exception, SEER captures it
    
# The error traceback is sent to SEER with:
# - Full stack trace
# - Error message
# - Timestamp
# - Job metadata
Graceful Error Handling
Handle errors gracefully while still reporting to SEER
from seerpy import Seer
import logging

seer = Seer(apiKey='YOUR_API_KEY')
logger = logging.getLogger(__name__)

with seer.monitor("etl-pipeline", capture_logs=True):
    try:
        # Critical operation
        data = extract_data()
        transformed = transform_data(data)
        load_data(transformed)
        
    except ConnectionError as e:
        # Handle specific errors
        logger.error(f"Database connection failed: {e}")
        # Try fallback method
        load_to_backup_location(transformed)
        
    except Exception as e:
        # Log unexpected errors
        logger.critical(f"Unexpected error: {e}")
        # SEER will capture this in the logs
        raise  # Re-raise to mark job as failed
Retry Logic with Exponential Backoff
Implement retry logic for transient failures
from seerpy import Seer
import time
import requests

seer = Seer(apiKey='YOUR_API_KEY')

def fetch_with_retry(url, max_retries=3):
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=10)
            response.raise_for_status()
            return response.json()
        except requests.RequestException as e:
            if attempt == max_retries - 1:
                raise  # Final attempt failed
            
            wait_time = 2 ** attempt  # Exponential backoff
            print(f"⚠ Attempt {attempt + 1} failed, retrying in {wait_time}s...")
            time.sleep(wait_time)

with seer.monitor("api-data-fetch", capture_logs=True):
    data = fetch_with_retry("https://api.example.com/data")
    process_data(data)
Partial Failure Handling
Continue processing when some items fail
from seerpy import Seer

seer = Seer(apiKey='YOUR_API_KEY')

def process_batch(items):
    successful = []
    failed = []
    
    for item in items:
        try:
            result = process_item(item)
            successful.append(result)
        except Exception as e:
            failed.append({"item": item, "error": str(e)})
    
    return successful, failed

with seer.monitor("batch-processing", 
                  capture_logs=True,
                  metadata={"total_items": len(items)}):
    successful, failed = process_batch(items)
    
    print(f"✓ Processed {len(successful)} items successfully")
    if failed:
        print(f"⚠ {len(failed)} items failed")
        # Log failed items for review
        for failure in failed:
            print(f"  - {failure['item']}: {failure['error']}")
    
    # Job completes successfully even with partial failures
    # Failed items are logged for investigation
Offline Mode & Network Failures
SEER handles network failures gracefully

Automatic Retry

SEER automatically retries failed API calls with exponential backoff (1s, 2s, 4s)

Payload Saving

If all retries fail, SEER saves the payload locally for manual recovery:

# Failed payloads are saved to:
./failed_payloads/monitoring_<timestamp>.json
./failed_payloads/heartbeat_<timestamp>.json

# You can manually retry by sending the saved payload:
curl -X POST https://api.seer.ansrstudio.com/monitoring \
  -H "Authorization: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d @./failed_payloads/monitoring_1234567890.json

Automatic Replay of Failed Payloads

Use this helper function to automatically replay all failed payloads:

import os
import json
import requests

def replay_failed_payloads(api_key: str):
    """Automatically replay all failed payloads from the failed_payloads directory"""
    temp_dir = os.path.join(os.path.dirname(__file__), "failed_payloads")
    
    if not os.path.exists(temp_dir):
        return
    
    for filename in os.listdir(temp_dir):
        filepath = os.path.join(temp_dir, filename)
        
        # Load the saved payload
        with open(filepath, "r") as f:
            payload = json.load(f)
        
        # Set up headers
        headers = {
            "Authorization": api_key,
            "Content-Type": "application/json"
        }
        
        # Determine endpoint based on filename
        if "monitoring" in filename:
            url = "https://api.seer.ansrstudio.com/monitoring"
        elif "heartbeat" in filename:
            url = "https://api.seer.ansrstudio.com/heartbeat"
        else:
            continue  # Skip unknown file types
        
        try:
            # Attempt to send the payload
            response = requests.post(url, headers=headers, json=payload)
            response.raise_for_status()
            
            # If successful, remove the file
            os.remove(filepath)
            print(f"✓ Successfully replayed {filename}")
            
        except Exception as e:
            # Leave the file for next retry
            print(f"⚠ Failed to replay {filename}: {e}")
            continue

# Usage example:
replay_failed_payloads("YOUR_API_KEY")

Run this function periodically (e.g., via cron) to automatically retry failed API calls when connectivity is restored.

Error Notification Setup
Configure Slack notifications for pipeline failures

1. Connect Slack Workspace

Go to Dashboard → Settings → Integrations → Connect Slack

2. Configure Notifications

Set up alerts for:

  • Job failures
  • Missed heartbeats
  • Long-running jobs
  • Error rate thresholds

3. Test Notifications

from seerpy import Seer

seer = Seer(apiKey='YOUR_API_KEY')

# This will trigger a failure notification
with seer.monitor("test-notification"):
    raise Exception("Test error for Slack notification")
Best Practices Summary

Use capture_logs=True for debugging

Capture stdout/stderr to see what happened before failure

Add metadata for context

Include relevant information like record counts, file names, etc.

Implement retry logic for transient failures

Network issues and API rate limits are temporary

Handle partial failures gracefully

Don't fail entire batch if one item fails

Set up Slack notifications

Get alerted immediately when pipelines fail