Monitor Entire DAG Execution
Track DAG runs from start to finish
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
from seerpy import Seer
seer = Seer(apiKey='YOUR_API_KEY')
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
def extract_task(**context):
"""Extract data from source"""
with seer.monitor("airflow-extract",
metadata={"dag_id": context['dag'].dag_id,
"task_id": context['task_instance'].task_id}):
# Your extraction logic
data = fetch_data_from_api()
context['ti'].xcom_push(key='raw_data', value=data)
print(f"Extracted {len(data)} records")
def transform_task(**context):
"""Transform the data"""
with seer.monitor("airflow-transform",
metadata={"dag_id": context['dag'].dag_id,
"task_id": context['task_instance'].task_id}):
# Get data from previous task
data = context['ti'].xcom_pull(key='raw_data', task_ids='extract')
# Your transformation logic
transformed = process_data(data)
context['ti'].xcom_push(key='transformed_data', value=transformed)
print(f"Transformed {len(transformed)} records")
def load_task(**context):
"""Load data to warehouse"""
with seer.monitor("airflow-load",
metadata={"dag_id": context['dag'].dag_id,
"task_id": context['task_instance'].task_id}):
# Get transformed data
data = context['ti'].xcom_pull(key='transformed_data', task_ids='transform')
# Your load logic
load_to_warehouse(data)
print(f"Loaded {len(data)} records to warehouse")
with DAG(
'seer_monitored_etl',
default_args=default_args,
description='ETL pipeline monitored by SEER',
schedule_interval=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(
task_id='extract',
python_callable=extract_task,
provide_context=True,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_task,
provide_context=True,
)
load = PythonOperator(
task_id='load',
python_callable=load_task,
provide_context=True,
)
extract >> transform >> load
DAG-Level Monitoring with Callbacks
Monitor entire DAG runs with success/failure callbacks
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
from seerpy import Seer
import requests
seer = Seer(apiKey='YOUR_API_KEY')
def notify_seer_success(context):
"""Called when DAG succeeds"""
dag_run = context['dag_run']
requests.post(
'https://api.seer.ansrstudio.com/monitoring',
headers={'Authorization': 'YOUR_API_KEY'},
json={
'job_name': f"dag-{context['dag'].dag_id}",
'status': 'success',
'current_time': datetime.utcnow().isoformat() + 'Z',
'metadata': {
'dag_id': context['dag'].dag_id,
'execution_date': str(dag_run.execution_date),
'run_id': dag_run.run_id,
'duration': str(dag_run.end_date - dag_run.start_date) if dag_run.end_date else None
}
}
)
def notify_seer_failure(context):
"""Called when DAG fails"""
dag_run = context['dag_run']
exception = context.get('exception', 'Unknown error')
requests.post(
'https://api.seer.ansrstudio.com/monitoring',
headers={'Authorization': 'YOUR_API_KEY'},
json={
'job_name': f"dag-{context['dag'].dag_id}",
'status': 'failed',
'current_time': datetime.utcnow().isoformat() + 'Z',
'error_message': str(exception),
'metadata': {
'dag_id': context['dag'].dag_id,
'execution_date': str(dag_run.execution_date),
'run_id': dag_run.run_id,
'failed_task': context['task_instance'].task_id
}
}
)
with DAG(
'monitored_dag_with_callbacks',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
on_success_callback=notify_seer_success,
on_failure_callback=notify_seer_failure,
catchup=False,
) as dag:
# Your tasks here
task1 = PythonOperator(
task_id='task1',
python_callable=your_function,
)
Heartbeat Monitoring for Long-Running Tasks
Send periodic heartbeats during long tasks
from airflow.operators.python import PythonOperator
from seerpy import Seer
import time
seer = Seer(apiKey='YOUR_API_KEY')
def long_running_task(**context):
"""Process large dataset in batches"""
job_name = f"airflow-batch-{context['dag'].dag_id}"
batches = range(1, 101) # 100 batches
for batch_num in batches:
# Send heartbeat every 10 batches
if batch_num % 10 == 0:
seer.heartbeat(job_name, metadata={
"batch": batch_num,
"progress": f"{batch_num}%",
"dag_id": context['dag'].dag_id
})
# Process batch
process_batch(batch_num)
time.sleep(1)
print(f"Completed all {len(batches)} batches")
task = PythonOperator(
task_id='long_task',
python_callable=long_running_task,
provide_context=True,
)