Back to Examples

Apache Airflow Integration

Monitor Airflow DAGs and tasks with SEER

Airflow DAG
Monitor Entire DAG Execution
Track DAG runs from start to finish
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from seerpy import Seer

seer = Seer(apiKey='YOUR_API_KEY')

default_args = {
    'owner': 'data-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

def extract_task(**context):
    """Extract data from source"""
    with seer.monitor("airflow-extract", 
                      metadata={"dag_id": context['dag'].dag_id,
                               "task_id": context['task_instance'].task_id}):
        # Your extraction logic
        data = fetch_data_from_api()
        context['ti'].xcom_push(key='raw_data', value=data)
        print(f"Extracted {len(data)} records")

def transform_task(**context):
    """Transform the data"""
    with seer.monitor("airflow-transform",
                      metadata={"dag_id": context['dag'].dag_id,
                               "task_id": context['task_instance'].task_id}):
        # Get data from previous task
        data = context['ti'].xcom_pull(key='raw_data', task_ids='extract')
        
        # Your transformation logic
        transformed = process_data(data)
        context['ti'].xcom_push(key='transformed_data', value=transformed)
        print(f"Transformed {len(transformed)} records")

def load_task(**context):
    """Load data to warehouse"""
    with seer.monitor("airflow-load",
                      metadata={"dag_id": context['dag'].dag_id,
                               "task_id": context['task_instance'].task_id}):
        # Get transformed data
        data = context['ti'].xcom_pull(key='transformed_data', task_ids='transform')
        
        # Your load logic
        load_to_warehouse(data)
        print(f"Loaded {len(data)} records to warehouse")

with DAG(
    'seer_monitored_etl',
    default_args=default_args,
    description='ETL pipeline monitored by SEER',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
) as dag:
    
    extract = PythonOperator(
        task_id='extract',
        python_callable=extract_task,
        provide_context=True,
    )
    
    transform = PythonOperator(
        task_id='transform',
        python_callable=transform_task,
        provide_context=True,
    )
    
    load = PythonOperator(
        task_id='load',
        python_callable=load_task,
        provide_context=True,
    )
    
    extract >> transform >> load
DAG-Level Monitoring with Callbacks
Monitor entire DAG runs with success/failure callbacks
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from seerpy import Seer
import requests

seer = Seer(apiKey='YOUR_API_KEY')

def notify_seer_success(context):
    """Called when DAG succeeds"""
    dag_run = context['dag_run']
    
    requests.post(
        'https://api.seer.ansrstudio.com/monitoring',
        headers={'Authorization': 'YOUR_API_KEY'},
        json={
            'job_name': f"dag-{context['dag'].dag_id}",
            'status': 'success',
            'current_time': datetime.utcnow().isoformat() + 'Z',
            'metadata': {
                'dag_id': context['dag'].dag_id,
                'execution_date': str(dag_run.execution_date),
                'run_id': dag_run.run_id,
                'duration': str(dag_run.end_date - dag_run.start_date) if dag_run.end_date else None
            }
        }
    )

def notify_seer_failure(context):
    """Called when DAG fails"""
    dag_run = context['dag_run']
    exception = context.get('exception', 'Unknown error')
    
    requests.post(
        'https://api.seer.ansrstudio.com/monitoring',
        headers={'Authorization': 'YOUR_API_KEY'},
        json={
            'job_name': f"dag-{context['dag'].dag_id}",
            'status': 'failed',
            'current_time': datetime.utcnow().isoformat() + 'Z',
            'error_message': str(exception),
            'metadata': {
                'dag_id': context['dag'].dag_id,
                'execution_date': str(dag_run.execution_date),
                'run_id': dag_run.run_id,
                'failed_task': context['task_instance'].task_id
            }
        }
    )

with DAG(
    'monitored_dag_with_callbacks',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1),
    on_success_callback=notify_seer_success,
    on_failure_callback=notify_seer_failure,
    catchup=False,
) as dag:
    
    # Your tasks here
    task1 = PythonOperator(
        task_id='task1',
        python_callable=your_function,
    )
Heartbeat Monitoring for Long-Running Tasks
Send periodic heartbeats during long tasks
from airflow.operators.python import PythonOperator
from seerpy import Seer
import time

seer = Seer(apiKey='YOUR_API_KEY')

def long_running_task(**context):
    """Process large dataset in batches"""
    job_name = f"airflow-batch-{context['dag'].dag_id}"
    batches = range(1, 101)  # 100 batches
    
    for batch_num in batches:
        # Send heartbeat every 10 batches
        if batch_num % 10 == 0:
            seer.heartbeat(job_name, metadata={
                "batch": batch_num,
                "progress": f"{batch_num}%",
                "dag_id": context['dag'].dag_id
            })
        
        # Process batch
        process_batch(batch_num)
        time.sleep(1)
    
    print(f"Completed all {len(batches)} batches")

task = PythonOperator(
    task_id='long_task',
    python_callable=long_running_task,
    provide_context=True,
)