1. Pengenalan Airflow
Apache Airflow adalah platform open-source untuk menjadwalkan dan mengelola workflow (data pipeline). Airflow menggunakan Python untuk mendefinisikan pipeline sebagai kode, memudahkan version control dan testing.
Mengapa Airflow?
| Fitur | Penjelasan |
|---|---|
| Code as Pipeline | Pipeline didefinisikan dalam Python, bisa di-version control |
| Scheduling | Jalankan pipeline secara otomatis (cron-based) |
| Monitoring | Web UI untuk melihat status, log, dan retry |
| Scalable | Dari 1 DAG hingga ribuan DAG |
| Extensible | Ratusan provider: AWS, GCP, DB, API, dll |
| Error Handling | Retry, alerting, dan dependency management |
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β AIRFLOW ARCHITECTURE β β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β Web UI β βScheduler β βMetadata β β β β(Monitor) β β(Trigger) β β DB β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β β β β β ββββββββββββββββββΌβββββββββββββββββ β β β β β βββββββββββΌββββββββββ β β β β β β β ββββββ΄ββββ βββββ΄ββββ ββββ΄ββββββ β β βWorker 1β βWorker 2β βWorker 3β β β ββββββββββ βββββββββ ββββββββββ β β β β β β β Execute tasks in parallel β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Setup & Instalasi
# Instalasi Airflow dengan pip
# (gunakan constraint untuk versi yang kompatibel)
AIRFLOW_VERSION=2.9.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
# Install providers yang umum
pip install apache-airflow-providers-google
pip install apache-airflow-providers-amazon
pip install apache-airflow-providers-postgres
# Inisialisasi database
airflow db migrate
# Buat user admin
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@beebane.com \
--password admin123
# Jalankan webserver dan scheduler
airflow webserver --port 8080 &
airflow scheduler &
# Buka http://localhost:8080
# Atau gunakan Docker (recommended)
# curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# docker compose up -d
3. DAGs β Directed Acyclic Graphs
DAG adalah definisi pipeline di Airflow. Setiap DAG terdiri dari tasks yang memiliki dependencies (urutan eksekusi). Disebut "acyclic" karena tidak boleh ada loop.
# =============================================
# File: dags/etlpipeline.py
# Simpan di ~/airflow/dags/
# =============================================
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
# Default args untuk semua task dalam DAG
default_args = {
"owner": "beebane",
"depends_on_past": False,
"email": ["alert@beebane.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
# Definisikan DAG
dag = DAG(
dag_id="etl_penjualan_harian",
default_args=default_args,
description="ETL pipeline untuk data penjualan harian",
schedule="0 6 * * *", # Setiap hari jam 6 pagi
start_date=datetime(2026, 1, 1),
catchup=False, # Jangan backfill otomatis
tags=["etl", "penjualan", "production"],
max_active_runs=1, # Hanya 1 run aktif
)
# ----- Definisikan Tasks -----
def extract_data(**context):
"""Extract data dari sumber."""
import pandas as pd
df = pd.read_csv("data/penjualan_raw.csv")
print(f"Extracted {len(df)} records")
# Simpan ke temporary
df.to_csv("/tmp/extracted.csv", index=False)
return len(df)
def transform_data(**context):
"""Transform dan bersihkan data."""
import pandas as pd
df = pd.read_csv("/tmp/extracted.csv")
# Cleaning
df = df.dropna()
df["total"] = df["harga"] * df["jumlah"]
df["tanggal"] = pd.to_datetime(df["tanggal"])
df.to_csv("/tmp/transformed.csv", index=False)
print(f"Transformed: {len(df)} clean records")
def load_data(**context):
"""Load ke database tujuan."""
import pandas as pd
df = pd.read_csv("/tmp/transformed.csv")
# df.to_sql("penjualan", engine, if_exists="append")
print(f"Loaded {len(df)} records to database")
# Task definitions
task_extract = PythonOperator(
task_id="extract",
python_callable=extract_data,
dag=dag,
)
task_transform = PythonOperator(
task_id="transform",
python_callable=transform_data,
dag=dag,
)
task_load = PythonOperator(
task_id="load",
python_callable=load_data,
dag=dag,
)
task_notify = BashOperator(
task_id="notify",
bash_command='echo "ETL pipeline selesai pada $(date)"',
dag=dag,
)
# ----- Dependencies -----
# Setelah extract β transform β load β notify
task_extract >> task_transform >> task_load >> task_notify
# Atau: task_extract.set_downstream(task_transform)
4. Operators
Operator adalah "building block" task di Airflow. Setiap operator menjalankan satu jenis aksi.
Operator yang Sering Digunakan
| Operator | Fungsi | Contoh |
|---|---|---|
| PythonOperator | Menjalankan Python function | ETL, data processing |
| BashOperator | Menjalankan bash command | Script, CLI tools |
| PostgresOperator | Menjalankan SQL query | Insert, update, create table |
| S3Operator | Upload/download S3 | Backup data ke S3 |
| EmailOperator | Kirim email | Notifikasi selesai |
| DockerOperator | Jalankan Docker container | Isolated execution |
| KubernetesPodOperator | Jalankan pod di K8s | Scalable task |
# =============================================
# Operator Examples
# =============================================
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.operators.empty import EmptyOperator
# PythonOperator
def process_data(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract")
processed = [x * 2 for x in data]
return processed
task_process = PythonOperator(
task_id="process",
python_callable=process_data,
dag=dag,
)
# PostgresOperator
task_create_table = PostgresOperator(
task_id="create_table",
postgres_conn_id="my_postgres",
sql="""
CREATE TABLE IF NOT EXISTS sales (
id SERIAL PRIMARY KEY,
tanggal DATE,
total DECIMAL(10,2)
);
""",
dag=dag,
)
# BranchPythonOperator (conditional)
def choose_branch(**context):
import datetime
if datetime.date.today().weekday() == 0: # Monday
return "weekly_report"
return "daily_report"
branch = BranchPythonOperator(
task_id="choose_branch",
python_callable=choose_branch,
dag=dag,
)
daily = EmptyOperator(task_id="daily_report", dag=dag)
weekly = EmptyOperator(task_id="weekly_report", dag=dag)
branch >> [daily, weekly]
5. XCom β Cross Communication
XCom (Cross-Communication) memungkinkan tasks berbagi data kecil antar satu sama lain.
# =============================================
# XCom β Task Communication
# =============================================
# Push data: return value otomatis menjadi XCom
def extract(**context):
data = {"records": 1500, "date": "2026-06-29"}
return data # Otomatis push ke XCom
# Pull data
def transform(**context):
ti = context["ti"]
data = ti.xcom_pull(task_ids="extract", key="return_value")
print(f"Received {data['records']} records")
# Push hasil transform
ti.xcom_push(key="transform_result", value={"clean_records": 1450})
# Manual XCom push
def load(**context):
ti = context["ti"]
result = ti.xcom_pull(task_ids="transform", key="transform_result")
print(f"Loading {result['clean_records']} records")
# β οΈ XCom bukan untuk data besar! Max ~48KB (default)
# Untuk data besar: simpan ke file/S3, pass path via XCom
def extract_large(**context):
df = pd.read_csv("large_file.csv")
path = "/tmp/extracted.parquet"
df.to_parquet(path)
return path # Hanya pass path file
6. Sensors & Triggers
Sensor adalah task khusus yang menunggu sampai kondisi tertentu terpenuhi sebelum lanjut.
# =============================================
# Sensors
# =============================================
from airflow.sensors.filesystem import FileSensor
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.sensors.sql import SqlSensor
from airflow.sensors.python import PythonSensor
# FileSensor: tunggu file muncul
wait_for_file = FileSensor(
task_id="wait_for_data_file",
filepath="/data/incoming/sales_{{ ds }}.csv",
fs_type="local",
poke_interval=30, # Cek setiap 30 detik
timeout=3600, # Timeout 1 jam
mode="reschedule", # Release worker slot saat menunggu
dag=dag,
)
# ExternalTaskSensor: tunggu task lain selesai
wait_for_other_dag = ExternalTaskSensor(
task_id="wait_for_extract_dag",
external_dag_id="extract_pipeline",
external_task_id="load",
timeout=7200,
dag=dag,
)
# PythonSensor: custom condition
def check_api_available():
import requests
try:
r = requests.get("https://api.example.com/health", timeout=5)
return r.status_code == 200
except:
return False
wait_for_api = PythonSensor(
task_id="wait_for_api",
python_callable=check_api_available,
poke_interval=60,
timeout=1800,
dag=dag,
)
7. Pools, Priority & Concurrency
# =============================================
# Pools & Priority
# =============================================
# Pool: batasi concurrent tasks ke resource tertentu
# Buat pool via CLI:
# airflow pools set db_pool 5 "Pool untuk database tasks"
task_db = PythonOperator(
task_id="db_operation",
python_callable=my_func,
pool="db_pool", # Batasi ke pool ini
pool_slots=1, # Gunakan 1 slot
priority_weight=10, # Prioritas lebih tinggi
dag=dag,
)
# DAG-level concurrency settings
dag = DAG(
"my_dag",
max_active_runs=2, # Max 2 run bersamaan
max_active_tasks=10, # Max 10 task bersamaan
concurrency=5, # Max 5 task dalam 1 DAG run
)
# Task-level settings
task = PythonOperator(
task_id="heavy_task",
python_callable=heavy_func,
retries=3,
retry_delay=timedelta(minutes=10),
execution_timeout=timedelta(hours=1),
queue="heavy", # Worker queue
dag=dag,
)
8. Backfill & Catchup
# =============================================
# Backfill & Catchup
# =============================================
# Backfill: jalankan DAG untuk rentang tanggal tertentu
airflow dags backfill \
-s 2026-01-01 \
-e 2026-06-01 \
etl_penjualan_harian
# Catchup: otomatis jalankan semua run yang terlewat
# Diatur di DAG:
dag = DAG(
"my_dag",
start_date=datetime(2026, 1, 1),
catchup=True, # Jalankan semua dari start_date
)
# Disable catchup untuk production:
dag = DAG(
"my_dag",
start_date=datetime(2026, 1, 1),
catchup=False, # Hanya jalankan yang terbaru
)
9. Monitoring & Best Practices
- Idempotent tasks β task bisa dijalankan ulang tanpa efek samping
- Small data via XCom β untuk data besar, simpan file dan pass path
- Atomic tasks β satu task = satu tanggung jawab
- Use templates β gunakan Jinja templates untuk dynamic values
- Version control DAGs β simpan di Git
- Alerting β gunakan email/Slack untuk failure notification
- Test DAGs β gunakan
airflow tasks testsebelum deploy - Avoid top-level code β jangan jalankan kode berat saat import DAG
10. Quiz Pemahaman
1. Apa itu DAG di Airflow?
2. Apa fungsi XCom?
3. Apa perbedaan Operator dan Sensor?
4. Apa itu backfill?
5. Mengapa tasks harus idempotent?
Rangkuman
- Airflow β platform orchestration untuk menjadwalkan dan mengelola data pipeline
- DAG β pipeline didefinisikan sebagai kode Python dengan dependencies
- Operators β building block: Python, Bash, SQL, Email, Docker
- XCom β komunikasi antar task (data kecil saja)
- Sensors β menunggu kondisi: file, API, database
- Best practices β idempotent, atomic tasks, version control