Back to Examples

Scheduled Jobs Monitoring

Monitor cron jobs, scheduled tasks, and recurring processes

Cron Job
Linux Cron Job with SEER
Monitor daily backup jobs

Python Script (daily_backup.py):

#!/usr/bin/env python3
from seerpy import Seer
import subprocess
import os
from datetime import datetime

seer = Seer(apiKey=os.getenv('SEER_API_KEY'))

def backup_database():
    """Backup PostgreSQL database"""
    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
    backup_file = f'/backups/db_backup_{timestamp}.sql'
    
    cmd = [
        'pg_dump',
        '-h', 'localhost',
        '-U', 'postgres',
        '-d', 'production_db',
        '-f', backup_file
    ]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    if result.returncode != 0:
        raise Exception(f"Backup failed: {result.stderr}")
    
    # Verify backup file
    size_mb = os.path.getsize(backup_file) / (1024 * 1024)
    print(f"Backup created: {backup_file} ({size_mb:.2f} MB)")
    return backup_file, size_mb

def cleanup_old_backups():
    """Remove backups older than 30 days"""
    backup_dir = '/backups'
    cutoff = datetime.now().timestamp() - (30 * 24 * 60 * 60)
    removed = 0
    
    for filename in os.listdir(backup_dir):
        filepath = os.path.join(backup_dir, filename)
        if os.path.getmtime(filepath) < cutoff:
            os.remove(filepath)
            removed += 1
    
    print(f"Removed {removed} old backups")
    return removed

# Monitor the backup job
with seer.monitor("daily-database-backup", 
                  capture_logs=True,
                  metadata={"server": os.uname().nodename}):
    
    backup_file, size = backup_database()
    removed = cleanup_old_backups()
    
    print(f"Backup completed successfully")
    print(f"File: {backup_file}")
    print(f"Size: {size:.2f} MB")
    print(f"Old backups removed: {removed}")

Crontab Configuration:

# Set SEER API key
SEER_API_KEY=your_api_key_here

# Run backup daily at 2 AM
0 2 * * * /usr/bin/python3 /scripts/daily_backup.py >> /var/log/backup.log 2>&1

# Run data sync every 6 hours
0 */6 * * * /usr/bin/python3 /scripts/data_sync.py >> /var/log/sync.log 2>&1

# Run cleanup weekly on Sunday at 3 AM
0 3 * * 0 /usr/bin/python3 /scripts/weekly_cleanup.py >> /var/log/cleanup.log 2>&1
APScheduler
Python APScheduler Integration
Monitor scheduled tasks in Python applications
from apscheduler.schedulers.background import BackgroundScheduler
from seerpy import Seer
import time

seer = Seer(apiKey='YOUR_API_KEY')

def hourly_sync():
    """Sync data every hour"""
    with seer.monitor("hourly-sync", capture_logs=True):
        # Your sync logic here
        records = sync_data_from_api()
        print(f"Synced {len(records)} records")

def daily_report():
    """Generate daily report"""
    with seer.monitor("daily-report", capture_logs=True):
        # Your report generation logic
        report = generate_sales_report()
        send_email(report)
        print("Daily report sent")

def weekly_cleanup():
    """Weekly database cleanup"""
    with seer.monitor("weekly-cleanup", capture_logs=True):
        # Cleanup logic
        deleted = cleanup_old_records()
        print(f"Cleaned up {deleted} old records")

# Create scheduler
scheduler = BackgroundScheduler()

# Schedule jobs
scheduler.add_job(hourly_sync, 'interval', hours=1)
scheduler.add_job(daily_report, 'cron', hour=8, minute=0)
scheduler.add_job(weekly_cleanup, 'cron', day_of_week='sun', hour=2, minute=0)

# Start scheduler
scheduler.start()

try:
    # Keep the script running
    while True:
        time.sleep(60)
except (KeyboardInterrupt, SystemExit):
    scheduler.shutdown()
Windows Task Scheduler Integration
Monitor scheduled tasks on Windows
# windows_backup.py
from seerpy import Seer
import subprocess
import os
from datetime import datetime

seer = Seer(apiKey=os.getenv('SEER_API_KEY'))

def backup_files():
    """Backup important files using robocopy"""
    source = r'C:\ImportantData'
    destination = r'D:\Backups\' + datetime.now().strftime('%Y%m%d')
    
    # Create destination if it doesn't exist
    os.makedirs(destination, exist_ok=True)
    
    # Run robocopy
    cmd = [
        'robocopy',
        source,
        destination,
        '/E',  # Copy subdirectories including empty ones
        '/Z',  # Copy files in restartable mode
        '/LOG:backup.log'
    ]
    
    result = subprocess.run(cmd, capture_output=True, text=True)
    
    # Robocopy return codes: 0-7 are success, 8+ are errors
    if result.returncode >= 8:
        raise Exception(f"Backup failed with code {result.returncode}")
    
    print(f"Backup completed to {destination}")
    return destination

with seer.monitor("windows-backup", capture_logs=True):
    destination = backup_files()
    print(f"Files backed up successfully to {destination}")

Creating Windows Task:

  1. 1. Open Task Scheduler
  2. 2. Create Basic Task
  3. 3. Set trigger (daily, weekly, etc.)
  4. 4. Action: Start a program
  5. 5. Program: C:\Python39\python.exe
  6. 6. Arguments: C:\scripts\windows_backup.py
  7. 7. Set environment variable SEER_API_KEY in task settings
Best Practices for Scheduled Jobs
  • • Always use environment variables for API keys
  • • Log stdout and stderr to files for debugging
  • • Set job-specific metadata (server name, environment)
  • • Use capture_logs=True to track all output
  • • Implement proper error handling and retries
  • • Test manually before scheduling
  • • Set up failure notifications in SEER dashboard
  • • Monitor execution time trends