AI & Data Science

Apache Airflow untuk Data Pipeline β€” DAGs, Operators, XCom & Monitoring

Tutorial lengkap Apache Airflow untuk data pipeline orchestration β€” dari konsep DAG, operators, XCom, sensors, pools, backfill, hingga monitoring dan best practices

1. Pengenalan Airflow

Apache Airflow adalah platform open-source untuk menjadwalkan dan mengelola workflow (data pipeline). Airflow menggunakan Python untuk mendefinisikan pipeline sebagai kode, memudahkan version control dan testing.

Mengapa Airflow?

FiturPenjelasan
Code as PipelinePipeline didefinisikan dalam Python, bisa di-version control
SchedulingJalankan pipeline secara otomatis (cron-based)
MonitoringWeb UI untuk melihat status, log, dan retry
ScalableDari 1 DAG hingga ribuan DAG
ExtensibleRatusan provider: AWS, GCP, DB, API, dll
Error HandlingRetry, alerting, dan dependency management
Diagram: Airflow Architecture
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                AIRFLOW ARCHITECTURE                              β”‚
β”‚                                                                  β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”               β”‚
β”‚  β”‚ Web UI   β”‚     β”‚Scheduler β”‚     β”‚Metadata  β”‚               β”‚
β”‚  β”‚(Monitor) β”‚     β”‚(Trigger) β”‚     β”‚  DB      β”‚               β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜               β”‚
β”‚       β”‚                β”‚                β”‚                       β”‚
β”‚       β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                       β”‚
β”‚                        β”‚                                        β”‚
β”‚              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”‚
β”‚              β”‚         β”‚         β”‚                              β”‚
β”‚         β”Œβ”€β”€β”€β”€β”΄β”€β”€β”€β” β”Œβ”€β”€β”€β”΄β”€β”€β”€β” β”Œβ”€β”€β”΄β”€β”€β”€β”€β”€β”                      β”‚
β”‚         β”‚Worker 1β”‚ β”‚Worker 2β”‚ β”‚Worker 3β”‚                      β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β”‚
β”‚              β”‚         β”‚         β”‚                              β”‚
β”‚         Execute tasks in parallel                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Setup & Instalasi

Bash β€” Instalasi Airflow
# Instalasi Airflow dengan pip
# (gunakan constraint untuk versi yang kompatibel)

AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

# Install providers yang umum
pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres

# Inisialisasi database
airflow db migrate

# Buat user admin
airflow users create \
  --username admin \
  --firstname Admin \
  --lastname User \
  --role Admin \
  --email admin@beebane.com \
  --password admin123

# Jalankan webserver dan scheduler
airflow webserver --port 8080 &
airflow scheduler &

# Buka http://localhost:8080

# Atau gunakan Docker (recommended)
# curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# docker compose up -d

3. DAGs β€” Directed Acyclic Graphs

DAG adalah definisi pipeline di Airflow. Setiap DAG terdiri dari tasks yang memiliki dependencies (urutan eksekusi). Disebut "acyclic" karena tidak boleh ada loop.

Python β€” DAG Dasar
# =============================================
# File: dags/etlpipeline.py
# Simpan di ~/airflow/dags/
# =============================================
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

# Default args untuk semua task dalam DAG
default_args = {
    "owner": "beebane",
    "depends_on_past": False,
    "email": ["alert@beebane.com"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

# Definisikan DAG
dag = DAG(
    dag_id="etl_penjualan_harian",
    default_args=default_args,
    description="ETL pipeline untuk data penjualan harian",
    schedule="0 6 * * *",       # Setiap hari jam 6 pagi
    start_date=datetime(2026, 1, 1),
    catchup=False,              # Jangan backfill otomatis
    tags=["etl", "penjualan", "production"],
    max_active_runs=1,          # Hanya 1 run aktif
)

# ----- Definisikan Tasks -----
def extract_data(**context):
    """Extract data dari sumber."""
    import pandas as pd
    df = pd.read_csv("data/penjualan_raw.csv")
    print(f"Extracted {len(df)} records")
    # Simpan ke temporary
    df.to_csv("/tmp/extracted.csv", index=False)
    return len(df)

def transform_data(**context):
    """Transform dan bersihkan data."""
    import pandas as pd
    df = pd.read_csv("/tmp/extracted.csv")
    
    # Cleaning
    df = df.dropna()
    df["total"] = df["harga"] * df["jumlah"]
    df["tanggal"] = pd.to_datetime(df["tanggal"])
    
    df.to_csv("/tmp/transformed.csv", index=False)
    print(f"Transformed: {len(df)} clean records")

def load_data(**context):
    """Load ke database tujuan."""
    import pandas as pd
    df = pd.read_csv("/tmp/transformed.csv")
    # df.to_sql("penjualan", engine, if_exists="append")
    print(f"Loaded {len(df)} records to database")

# Task definitions
task_extract = PythonOperator(
    task_id="extract",
    python_callable=extract_data,
    dag=dag,
)

task_transform = PythonOperator(
    task_id="transform",
    python_callable=transform_data,
    dag=dag,
)

task_load = PythonOperator(
    task_id="load",
    python_callable=load_data,
    dag=dag,
)

task_notify = BashOperator(
    task_id="notify",
    bash_command='echo "ETL pipeline selesai pada $(date)"',
    dag=dag,
)

# ----- Dependencies -----
# Setelah extract β†’ transform β†’ load β†’ notify
task_extract >> task_transform >> task_load >> task_notify
# Atau: task_extract.set_downstream(task_transform)

4. Operators

Operator adalah "building block" task di Airflow. Setiap operator menjalankan satu jenis aksi.

Operator yang Sering Digunakan

OperatorFungsiContoh
PythonOperatorMenjalankan Python functionETL, data processing
BashOperatorMenjalankan bash commandScript, CLI tools
PostgresOperatorMenjalankan SQL queryInsert, update, create table
S3OperatorUpload/download S3Backup data ke S3
EmailOperatorKirim emailNotifikasi selesai
DockerOperatorJalankan Docker containerIsolated execution
KubernetesPodOperatorJalankan pod di K8sScalable task
Python β€” Berbagai Operator
# =============================================
# Operator Examples
# =============================================
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator

# PythonOperator
def process_data(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract")
    processed = [x * 2 for x in data]
    return processed

task_process = PythonOperator(
    task_id="process",
    python_callable=process_data,
    dag=dag,
)

# PostgresOperator
task_create_table = PostgresOperator(
    task_id="create_table",
    postgres_conn_id="my_postgres",
    sql="""
        CREATE TABLE IF NOT EXISTS sales (
            id SERIAL PRIMARY KEY,
            tanggal DATE,
            total DECIMAL(10,2)
        );
    """,
    dag=dag,
)

# BranchPythonOperator (conditional)
def choose_branch(**context):
    import datetime
    if datetime.date.today().weekday() == 0:  # Monday
        return "weekly_report"
    return "daily_report"

branch = BranchPythonOperator(
    task_id="choose_branch",
    python_callable=choose_branch,
    dag=dag,
)

daily = EmptyOperator(task_id="daily_report", dag=dag)
weekly = EmptyOperator(task_id="weekly_report", dag=dag)

branch >> [daily, weekly]

5. XCom β€” Cross Communication

XCom (Cross-Communication) memungkinkan tasks berbagi data kecil antar satu sama lain.

Python β€” XCom Communication
# =============================================
# XCom β€” Task Communication
# =============================================

# Push data: return value otomatis menjadi XCom
def extract(**context):
    data = {"records": 1500, "date": "2026-06-29"}
    return data  # Otomatis push ke XCom

# Pull data
def transform(**context):
    ti = context["ti"]
    data = ti.xcom_pull(task_ids="extract", key="return_value")
    print(f"Received {data['records']} records")
    
    # Push hasil transform
    ti.xcom_push(key="transform_result", value={"clean_records": 1450})

# Manual XCom push
def load(**context):
    ti = context["ti"]
    result = ti.xcom_pull(task_ids="transform", key="transform_result")
    print(f"Loading {result['clean_records']} records")

# ⚠️ XCom bukan untuk data besar! Max ~48KB (default)
# Untuk data besar: simpan ke file/S3, pass path via XCom
def extract_large(**context):
    df = pd.read_csv("large_file.csv")
    path = "/tmp/extracted.parquet"
    df.to_parquet(path)
    return path  # Hanya pass path file

6. Sensors & Triggers

Sensor adalah task khusus yang menunggu sampai kondisi tertentu terpenuhi sebelum lanjut.

Python β€” Sensors
# =============================================
# Sensors
# =============================================
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.python import PythonSensor

# FileSensor: tunggu file muncul
wait_for_file = FileSensor(
    task_id="wait_for_data_file",
    filepath="/data/incoming/sales_{{ ds }}.csv",
    fs_type="local",
    poke_interval=30,   # Cek setiap 30 detik
    timeout=3600,       # Timeout 1 jam
    mode="reschedule",  # Release worker slot saat menunggu
    dag=dag,
)

# ExternalTaskSensor: tunggu task lain selesai
wait_for_other_dag = ExternalTaskSensor(
    task_id="wait_for_extract_dag",
    external_dag_id="extract_pipeline",
    external_task_id="load",
    timeout=7200,
    dag=dag,
)

# PythonSensor: custom condition
def check_api_available():
    import requests
    try:
        r = requests.get("https://api.example.com/health", timeout=5)
        return r.status_code == 200
    except:
        return False

wait_for_api = PythonSensor(
    task_id="wait_for_api",
    python_callable=check_api_available,
    poke_interval=60,
    timeout=1800,
    dag=dag,
)

7. Pools, Priority & Concurrency

Python β€” Pools & Priority
# =============================================
# Pools & Priority
# =============================================

# Pool: batasi concurrent tasks ke resource tertentu
# Buat pool via CLI:
# airflow pools set db_pool 5 "Pool untuk database tasks"

task_db = PythonOperator(
    task_id="db_operation",
    python_callable=my_func,
    pool="db_pool",         # Batasi ke pool ini
    pool_slots=1,           # Gunakan 1 slot
    priority_weight=10,     # Prioritas lebih tinggi
    dag=dag,
)

# DAG-level concurrency settings
dag = DAG(
    "my_dag",
    max_active_runs=2,              # Max 2 run bersamaan
    max_active_tasks=10,            # Max 10 task bersamaan
    concurrency=5,                  # Max 5 task dalam 1 DAG run
)

# Task-level settings
task = PythonOperator(
    task_id="heavy_task",
    python_callable=heavy_func,
    retries=3,
    retry_delay=timedelta(minutes=10),
    execution_timeout=timedelta(hours=1),
    queue="heavy",                  # Worker queue
    dag=dag,
)

8. Backfill & Catchup

Bash β€” Backfill Commands
# =============================================
# Backfill & Catchup
# =============================================

# Backfill: jalankan DAG untuk rentang tanggal tertentu
airflow dags backfill \
  -s 2026-01-01 \
  -e 2026-06-01 \
  etl_penjualan_harian

# Catchup: otomatis jalankan semua run yang terlewat
# Diatur di DAG:
dag = DAG(
    "my_dag",
    start_date=datetime(2026, 1, 1),
    catchup=True,    # Jalankan semua dari start_date
)

# Disable catchup untuk production:
dag = DAG(
    "my_dag",
    start_date=datetime(2026, 1, 1),
    catchup=False,   # Hanya jalankan yang terbaru
)

9. Monitoring & Best Practices

πŸ’‘ Best Practices Airflow
  • Idempotent tasks β€” task bisa dijalankan ulang tanpa efek samping
  • Small data via XCom β€” untuk data besar, simpan file dan pass path
  • Atomic tasks β€” satu task = satu tanggung jawab
  • Use templates β€” gunakan Jinja templates untuk dynamic values
  • Version control DAGs β€” simpan di Git
  • Alerting β€” gunakan email/Slack untuk failure notification
  • Test DAGs β€” gunakan airflow tasks test sebelum deploy
  • Avoid top-level code β€” jangan jalankan kode berat saat import DAG

10. Quiz Pemahaman

1. Apa itu DAG di Airflow?

2. Apa fungsi XCom?

3. Apa perbedaan Operator dan Sensor?

4. Apa itu backfill?

5. Mengapa tasks harus idempotent?

Rangkuman

πŸ“ Poin Penting
  • Airflow β€” platform orchestration untuk menjadwalkan dan mengelola data pipeline
  • DAG β€” pipeline didefinisikan sebagai kode Python dengan dependencies
  • Operators β€” building block: Python, Bash, SQL, Email, Docker
  • XCom β€” komunikasi antar task (data kecil saja)
  • Sensors β€” menunggu kondisi: file, API, database
  • Best practices β€” idempotent, atomic tasks, version control