Best practices for handling errors and failures in your pipelines
from seerpy import Seer
seer = Seer(apiKey='YOUR_API_KEY')
# Errors are automatically captured and reported
with seer.monitor("data-processing"):
data = fetch_data()
process_data(data) # If this raises an exception, SEER captures it
# The error traceback is sent to SEER with:
# - Full stack trace
# - Error message
# - Timestamp
# - Job metadatafrom seerpy import Seer
import logging
seer = Seer(apiKey='YOUR_API_KEY')
logger = logging.getLogger(__name__)
with seer.monitor("etl-pipeline", capture_logs=True):
try:
# Critical operation
data = extract_data()
transformed = transform_data(data)
load_data(transformed)
except ConnectionError as e:
# Handle specific errors
logger.error(f"Database connection failed: {e}")
# Try fallback method
load_to_backup_location(transformed)
except Exception as e:
# Log unexpected errors
logger.critical(f"Unexpected error: {e}")
# SEER will capture this in the logs
raise # Re-raise to mark job as failedfrom seerpy import Seer
import time
import requests
seer = Seer(apiKey='YOUR_API_KEY')
def fetch_with_retry(url, max_retries=3):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt == max_retries - 1:
raise # Final attempt failed
wait_time = 2 ** attempt # Exponential backoff
print(f"⚠ Attempt {attempt + 1} failed, retrying in {wait_time}s...")
time.sleep(wait_time)
with seer.monitor("api-data-fetch", capture_logs=True):
data = fetch_with_retry("https://api.example.com/data")
process_data(data)from seerpy import Seer
seer = Seer(apiKey='YOUR_API_KEY')
def process_batch(items):
successful = []
failed = []
for item in items:
try:
result = process_item(item)
successful.append(result)
except Exception as e:
failed.append({"item": item, "error": str(e)})
return successful, failed
with seer.monitor("batch-processing",
capture_logs=True,
metadata={"total_items": len(items)}):
successful, failed = process_batch(items)
print(f"✓ Processed {len(successful)} items successfully")
if failed:
print(f"⚠ {len(failed)} items failed")
# Log failed items for review
for failure in failed:
print(f" - {failure['item']}: {failure['error']}")
# Job completes successfully even with partial failures
# Failed items are logged for investigationSEER automatically retries failed API calls with exponential backoff (1s, 2s, 4s)
If all retries fail, SEER saves the payload locally for manual recovery:
# Failed payloads are saved to: ./failed_payloads/monitoring_<timestamp>.json ./failed_payloads/heartbeat_<timestamp>.json # You can manually retry by sending the saved payload: curl -X POST https://api.seer.ansrstudio.com/monitoring \ -H "Authorization: YOUR_API_KEY" \ -H "Content-Type: application/json" \ -d @./failed_payloads/monitoring_1234567890.json
Use this helper function to automatically replay all failed payloads:
import os
import json
import requests
def replay_failed_payloads(api_key: str):
"""Automatically replay all failed payloads from the failed_payloads directory"""
temp_dir = os.path.join(os.path.dirname(__file__), "failed_payloads")
if not os.path.exists(temp_dir):
return
for filename in os.listdir(temp_dir):
filepath = os.path.join(temp_dir, filename)
# Load the saved payload
with open(filepath, "r") as f:
payload = json.load(f)
# Set up headers
headers = {
"Authorization": api_key,
"Content-Type": "application/json"
}
# Determine endpoint based on filename
if "monitoring" in filename:
url = "https://api.seer.ansrstudio.com/monitoring"
elif "heartbeat" in filename:
url = "https://api.seer.ansrstudio.com/heartbeat"
else:
continue # Skip unknown file types
try:
# Attempt to send the payload
response = requests.post(url, headers=headers, json=payload)
response.raise_for_status()
# If successful, remove the file
os.remove(filepath)
print(f"✓ Successfully replayed {filename}")
except Exception as e:
# Leave the file for next retry
print(f"⚠ Failed to replay {filename}: {e}")
continue
# Usage example:
replay_failed_payloads("YOUR_API_KEY")Run this function periodically (e.g., via cron) to automatically retry failed API calls when connectivity is restored.
Go to Dashboard → Settings → Integrations → Connect Slack
Set up alerts for:
from seerpy import Seer
seer = Seer(apiKey='YOUR_API_KEY')
# This will trigger a failure notification
with seer.monitor("test-notification"):
raise Exception("Test error for Slack notification")Use capture_logs=True for debugging
Capture stdout/stderr to see what happened before failure
Add metadata for context
Include relevant information like record counts, file names, etc.
Implement retry logic for transient failures
Network issues and API rate limits are temporary
Handle partial failures gracefully
Don't fail entire batch if one item fails
Set up Slack notifications
Get alerted immediately when pipelines fail