Python

Python asyncio: Async Programming dari Nol

Tutorial lengkap asyncio — event loop, coroutines, tasks, semaphores, queues, dan membangun aplikasi asynchronous yang efisien

1. Pengenalan asyncio

asyncio adalah library standar Python untuk menulis kode yang bersifat concurrent menggunakan model single-threaded, cooperative multitasking. Berbeda dengan threading yang menggunakan preemptive multitasking, asyncio memungkinkan satu thread menangani ribuan operasi I/O secara bersamaan tanpa blocking.

Bayangkan kamu sedang memesan makanan di restoran. Dengan pendekatan synchronous, kamu menunggu pesanan selesai sebelum melakukan apapun. Dengan async, kamu pesan makanan, lalu sambil menunggu kamu bisa cek HP, ngobrol, atau pesan minuman — begitu makanan siap, kamu langsung mengambilnya.

Kapan Menggunakan asyncio?

Skenario Gunakan asyncio? Alasan
HTTP requests massalāœ… Sangat cocokMenunggu response = I/O bound
Database queriesāœ… Sangat cocokMenunggu hasil query = I/O bound
File I/Oāš ļø CukupLebih baik gunakan aiofiles
Kalkulasi CPU intensifāŒ Tidak cocokGunakan multiprocessing
Web scraping massalāœ… Sangat cocokRibuan request sekaligus
WebSocket serverāœ… Sangat cocokLong-lived connections

asyncio vs Threading vs Multiprocessing

Aspek asyncio Threading Multiprocessing
MekanismeSingle thread, cooperativeMulti thread, preemptiveMulti process
Overhead🟢 Rendah🟔 SedangšŸ”“ Tinggi
Cocok untukI/O boundI/O boundCPU bound
GIL ImpactTidak masalahDibatasi GILTidak terpengaruh
Scalability10K+ tasks100-an threadsSejumlah core
Diagram: asyncio Event Loop
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│                   EVENT LOOP                         │
│                                                     │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”  │
│  │ Task A   │  │ Task B   │  │ Task C           │  │
│  │ (await   │  │ (await   │  │ (await            │  │
│  │  DB)     │  │  HTTP)   │  │  sleep)           │  │
│  ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”˜  ā””ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”˜  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜  │
│       │              │                  │            │
│       ā–¼              ā–¼                  ā–¼            │
│  ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” │
│  │           Selector / Callback Queue            │ │
│  │   (memantau kapan operasi I/O selesai)         │ │
│  ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ │
│                                                     │
│  Operasi I/O selesai → callback dieksekusi          │
│  Operasi belum selesai → lanjut ke task berikutnya  │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

2. Coroutines & async/await

Coroutines adalah fungsi spesial yang didefinisikan dengan async def. Coroutine tidak langsung dieksekusi saat dipanggil — ia mengembalikan sebuah coroutine object yang harus di-await atau dijalankan melalui event loop.

# Coroutine sederhana
import asyncio

async def sapa(nama: str) -> str:
    """Coroutine sederhana yang menyapa"""
    print(f"Halo, {nama}!")
    await asyncio.sleep(1)  # Simulasi operasi async
    return f"Selamat datang, {nama}"

# Menjalankan coroutine
async def main():
    # Method 1: await langsung
    hasil = await sapa("Budi")
    print(hasil)

    # Method 2: Menggunakan asyncio.run()
asyncio.run(main())
āš ļø Perbedaan Penting

Memanggil fungsi async def tanpa await TIDAK menjalankan fungsi tersebut! Kamu hanya membuat coroutine object. Ini adalah kesalahan paling umum pemula asyncio.

# āŒ SALAH: Coroutine tidak berjalan
async def ambil_data():
    print("Mengambil data...")
    await asyncio.sleep(1)
    return "data"

# Ini hanya membuat coroutine object, tidak menjalankan apapun!
coro = ambil_data()  # Tidak ada output!

# āœ… BENAR: Await coroutine
async def main():
    hasil = await ambil_data()  # Sekarang berjalan
    print(hasil)

# atau jalankan dengan asyncio.run()
asyncio.run(ambil_data())

Nested Coroutines

import asyncio
import time

async def ambil_data_api(url: str) -> dict:
    """Simulasi fetch dari API"""
    print(f"  Fetching {url}...")
    await asyncio.sleep(1)  # Simulasi network delay
    return {"url": url, "status": 200, "data": [1, 2, 3]}

async def proses_data(data: dict) -> str:
    """Proses data yang sudah diambil"""
    print(f"  Processing {data['url']}...")
    await asyncio.sleep(0.5)  # Simulasi processing
    return f"Processed: {len(data['data'])} items"

async def pipeline(url: str) -> str:
    """Pipeline: ambil data → proses → return"""
    data = await ambil_data_api(url)   # Step 1
    hasil = await proses_data(data)    # Step 2
    return hasil

async def main():
    mulai = time.time()
    hasil = await pipeline("https://api.example.com/users")
    selesai = time.time()
    print(f"Hasil: {hasil}")
    print(f"Waktu: {selesai - mulai:.2f}s")

asyncio.run(main())

Coroutine Lifecycle

Status Penjelasan
PENDINGCoroutine belum selesai
RUNNINGSedang dieksekusi
DONESelesai (sukses atau error)
CANCELLEDDibatalkan

3. Event Loop

Event loop adalah jantung dari asyncio — sebuah infinite loop yang menjadwalkan dan mengeksekusi coroutines, callbacks, dan I/O operations. Event loop memantau semua task dan menjalankan yang sudah siap.

import asyncio

async def tugas(nama, durasi):
    print(f"[{nama}] Mulai, durasi {durasi}s")
    await asyncio.sleep(durasi)
    print(f"[{nama}] Selesai!")
    return f"Hasil dari {nama}"

async def main():
    # Mendapatkan event loop saat ini
    loop = asyncio.get_running_loop()
    print(f"Event loop type: {type(loop)}")

    # Menjalankan beberapa task sekaligus
    task_a = loop.create_task(tugas("A", 2))
    task_b = loop.create_task(tugas("B", 1))
    task_c = loop.create_task(tugas("C", 3))

    # Menunggu semua selesai
    hasil = await asyncio.gather(task_a, task_b, task_c)
    print(f"Semua hasil: {hasil}")

asyncio.run(main())

Jalankan Blocking Code di Event Loop

import asyncio
import time

def operasi_blocking():
    """Fungsi biasa yang blocking (tidak async)"""
    time.sleep(3)
    return "Selesai dari blocking"

async def main():
    loop = asyncio.get_running_loop()

    # Jalankan fungsi blocking di executor (thread pool)
    # Agar event loop tidak ter-block
    hasil = await loop.run_in_executor(None, operasi_blocking)
    print(hasil)

    # Dengan custom executor
    from concurrent.futures import ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = [
            loop.run_in_executor(executor, operasi_blocking)
            for _ in range(4)
        ]
        hasil = await asyncio.gather(*futures)
        print(f"Semua selesai: {hasil}")

asyncio.run(main())
šŸ’” Tips

Gunakan run_in_executor() untuk menjalankan library synchronous (seperti requests atau time.sleep) tanpa mem-block event loop. Default executor menggunakan ThreadPoolExecutor.

4. Tasks & Futures

Task adalah wrapper untuk coroutine yang menjadwalkan eksekusinya di event loop. Task memungkinkan beberapa coroutine berjalan secara concurrent. Future adalah objek yang mewakili hasil yang belum tersedia — Task adalah subclass dari Future.

import asyncio

async def hitung_faktorial(n: int) -> int:
    """Hitung faktorial secara async dengan delay kecil"""
    hasil = 1
    for i in range(1, n + 1):
        hasil *= i
        if i % 5 == 0:
            await asyncio.sleep(0)  # Yield control ke event loop
    return hasil

async def main():
    # Membuat tasks dari coroutines
    tasks = []
    for i in range(5, 15):
        task = asyncio.create_task(hitung_faktorial(i), name=f"fact-{i}")
        tasks.append(task)

    print(f"Dibuat {len(tasks)} tasks")

    # Menunggu semua task selesai
    hasil = await asyncio.gather(*tasks)

    for i, h in enumerate(hasil):
        print(f"  {i+5}! = {h}")

asyncio.run(main())

Melihat Status Task

import asyncio

async def tugas_lama(nama, durasi):
    await asyncio.sleep(durasi)
    return f"{nama} selesai"

async def main():
    task = asyncio.create_task(tugas_lama("API Call", 5), name="api-task")

    # Cek status sebelum selesai
    print(f"Task name: {task.get_name()}")
    print(f"Done? {task.done()}")       # False
    print(f"Cancelled? {task.cancelled()}")  # False

    # Tunggu selesai
    hasil = await task

    print(f"Done? {task.done()}")       # True
    print(f"Hasil: {hasil}")

    # Akses exception jika ada
    print(f"Exception: {task.exception()}")  # None

asyncio.run(main())

Membatalkan Task

import asyncio

async def tugas_panjang():
    try:
        print("Tugas panjang mulai...")
        await asyncio.sleep(60)  # Tidur 60 detik
        return "Selesai"
    except asyncio.CancelledError:
        print("Tugas dibatalkan! Membersihkan...")
        await asyncio.sleep(0.1)  # Cleanup
        raise  # Re-raise agar status CANCELLED

async def main():
    task = asyncio.create_task(tugas_panjang())

    # Batalkan setelah 2 detik
    await asyncio.sleep(2)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print(f"Task dibatalkan: {task.cancelled()}")

asyncio.run(main())

Futures

import asyncio

async def main():
    loop = asyncio.get_running_loop()

    # Buat future
    future = loop.create_future()

    # Simulasi callback yang men-set nilai future
    async def set_future_value():
        await asyncio.sleep(2)
        future.set_result(42)  # Set hasil
        print("Future value set!")

    # Jalankan setter dan tunggu future
    asyncio.create_task(set_future_value())
    hasil = await future  # Tunggu sampai value di-set
    print(f"Future result: {hasil}")

asyncio.run(main())

5. asyncio.gather & Konkurensi

asyncio.gather() adalah cara paling umum untuk menjalankan beberapa coroutine secara concurrent dan menunggu semuanya selesai. Ini sangat powerful untuk operasi I/O yang bisa dilakukan paralel.

import asyncio
import time

async def fetch_api(nama: str, delay: float) -> str:
    """Simulasi API call"""
    print(f"[{nama}] Memulai request...")
    await asyncio.sleep(delay)
    print(f"[{nama}] Selesai dalam {delay}s")
    return f"Response dari {nama}"

async def main():
    mulai = time.time()

    # āŒ SEQUENTIAL: Total 6 detik
    # a = await fetch_api("API-1", 2)
    # b = await fetch_api("API-2", 2)
    # c = await fetch_api("API-3", 2)

    # āœ… CONCURRENT: Total 2 detik!
    a, b, c = await asyncio.gather(
        fetch_api("API-1", 2),
        fetch_api("API-2", 2),
        fetch_api("API-3", 2),
    )

    selesai = time.time()
    print(f"\nSemua selesai dalam {selesai - mulai:.2f}s")
    print(f"Hasil: {a}, {b}, {c}")

asyncio.run(main())

TaskGroup (Python 3.11+)

import asyncio

async def ambil_data(url: str) -> dict:
    await asyncio.sleep(1)
    return {"url": url, "data": "ok"}

async def main():
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/posts",
        "https://api.example.com/comments",
        "https://api.example.com/albums",
    ]

    # TaskGroup: Lebih aman, batalkan semua jika ada error
    async with asyncio.TaskGroup() as tg:
        tasks = [tg.create_task(ambil_data(url)) for url in urls]

    # Semua task sudah selesai di sini
    for task in tasks:
        print(task.result())

asyncio.run(main())
āš ļø gather vs TaskGroup

gather(): Jika satu task gagal, task lain tetap berjalan. TaskGroup: Jika satu task gagal, semua task lain dibatalkan. Gunakan TaskGroup untuk perilaku yang lebih aman.

as_completed: Proses Hasil Seiring Tersedia

import asyncio

async def tugas(nama, durasi):
    await asyncio.sleep(durasi)
    return f"{nama} selesai dalam {durasi}s"

async def main():
    tasks = [
        tugas("Cepat", 1),
        tugas("Sedang", 3),
        tugas("Lambat", 5),
    ]

    # Proses seiring selesai (tidak perlu tunggu semua)
    for coro in asyncio.as_completed(tasks):
        hasil = await coro
        print(f"āœ“ {hasil}")  # Yang cepat diproses duluan

asyncio.run(main())

6. Semaphore & Throttling

Semaphore membatasi jumlah task yang bisa berjalan secara concurrent. Ini sangat penting untuk menghindari overwhelming API server, membatasi koneksi database, atau mencegah resource exhaustion.

import asyncio
import time

async def akses_api(id: int, semaphore: asyncio.Semaphore):
    async with semaphore:  # Batasi concurrent access
        print(f"[{id}] Mulai akses API...")
        await asyncio.sleep(1)  # Simulasi API call
        print(f"[{id}] Selesai!")
        return f"Data-{id}"

async def main():
    # Batasi hanya 3 request bersamaan
    semaphore = asyncio.Semaphore(3)
    mulai = time.time()

    # Buat 10 tasks, tapi hanya 3 yang jalan sekaligus
    tasks = [akses_api(i, semaphore) for i in range(10)]
    hasil = await asyncio.gather(*tasks)

    selesai = time.time()
    print(f"\n10 requests selesai dalam {selesai - mulai:.2f}s")
    # Seharusnya ~4 detik (3 batch Ɨ 1 detik + 1 batch)
    print(f"Semua data: {len(hasil)} item")

asyncio.run(main())

BoundedSemaphore untuk Keamanan Ekstra

import asyncio

async def main():
    # BoundedSemaphore tidak bisa di-release melebihi nilai awal
    sem = asyncio.BoundedSemaphore(2)

    async def tugas(n):
        async with sem:
            print(f"  Task {n} masuk")
            await asyncio.sleep(1)
            print(f"  Task {n} keluar")

    await asyncio.gather(*[tugas(i) for i in range(5)])

asyncio.run(main())

Rate Limiter Custom

import asyncio
import time

class RateLimiter:
    """Rate limiter: max N request per detik"""

    def __init__(self, max_rate: int):
        self.semaphore = asyncio.Semaphore(max_rate)
        self.max_rate = max_rate

    async def acquire(self):
        await self.semaphore.acquire()

    async def release(self):
        await asyncio.sleep(1.0 / self.max_rate)
        self.semaphore.release()

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, *args):
        await self.release()

async def fetch_with_rate_limit(url: str, limiter: RateLimiter):
    async with limiter:
        print(f"[{time.time():.1f}] Fetching {url}")
        await asyncio.sleep(0.1)
        return f"Data dari {url}"

async def main():
    limiter = RateLimiter(5)  # Max 5 per detik
    urls = [f"https://api.example.com/page/{i}" for i in range(20)]

    tasks = [fetch_with_rate_limit(u, limiter) for u in urls]
    hasil = await asyncio.gather(*tasks)
    print(f"Selesai: {len(hasil)} requests")

asyncio.run(main())

7. Queues & Producer-Consumer

asyncio.Queue memungkinkan pattern producer-consumer dimana satu sisi memproduksi data dan sisi lain mengkonsumsinya — cocok untuk pipeline processing, task workers, dan data streaming.

import asyncio
import random

async def producer(queue: asyncio.Queue, nama: str, jumlah: int):
    """Menghasilkan item dan memasukkannya ke queue"""
    for i in range(jumlah):
        item = f"{nama}-item-{i}"
        await asyncio.sleep(random.uniform(0.1, 0.5))
        await queue.put(item)
        print(f"  šŸ“¦ [{nama}] Produksi: {item}")
    print(f"  āœ… [{nama}] Selesai memproduksi")

async def consumer(queue: asyncio.Queue, nama: str):
    """Mengambil dan memproses item dari queue"""
    while True:
        item = await queue.get()
        if item is None:  # Poison pill = stop signal
            break
        print(f"  āš™ļø [{nama}] Proses: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.6))
        queue.task_done()
    print(f"  āœ… [{nama}] Consumer selesai")

async def main():
    queue = asyncio.Queue(maxsize=5)  # Buffer size 5

    # 2 producers, 3 consumers
    producers = [
        asyncio.create_task(producer(queue, "P1", 5)),
        asyncio.create_task(producer(queue, "P2", 5)),
    ]
    consumers = [
        asyncio.create_task(consumer(queue, f"C{i}"))
        for i in range(3)
    ]

    # Tunggu producers selesai
    await asyncio.gather(*producers)
    print("\nSemua producer selesai, menunggu queue kosong...")

    # Tunggu semua item diproses
    await queue.join()

    # Kirim poison pills untuk stop consumers
    for _ in consumers:
        await queue.put(None)
    await asyncio.gather(*consumers)

    print("\nāœ… Pipeline selesai!")

asyncio.run(main())

Tipe Queue Lainnya

import asyncio

async def main():
    # 1. LIFO Queue: Last In First Out
    lifo = asyncio.LifoQueue()
    await lifo.put("pertama")
    await lifo.put("kedua")
    await lifo.put("ketiga")
    print(await lifo.get())  # "ketiga" (yang terakhir masuk)

    # 2. Priority Queue: Item dengan prioritas terendah duluan
    pq = asyncio.PriorityQueue()
    await pq.put((3, "Prioritas rendah"))
    await pq.put((1, "Prioritas tinggi"))
    await pq.put((2, "Prioritas sedang"))

    while not pq.empty():
        prio, item = await pq.get()
        print(f"  [{prio}] {item}")
    # Output: Prioritas tinggi, sedang, rendah

asyncio.run(main())

8. Timeout & Error Handling

Timeout sangat penting dalam async programming untuk mencegah task yang tidak pernah selesai (hang). asyncio menyediakan beberapa mekanisme timeout yang robust.

import asyncio

async def operasi_lambat():
    """Operasi yang bisa sangat lambat"""
    await asyncio.sleep(100)  # Akan timeout sebelum selesai
    return "data"

async def main():
    # Method 1: asyncio.timeout (Python 3.11+)
    try:
        async with asyncio.timeout(3):
            await operasi_lambat()
    except TimeoutError:
        print("ā° Timeout! Operasi terlalu lambat")

    # Method 2: asyncio.wait_for
    try:
        hasil = await asyncio.wait_for(operasi_lambat(), timeout=3)
    except TimeoutError:
        print("ā° Timeout via wait_for!")

    # Method 3: asyncio.timeout_at (deadline-based)
    loop = asyncio.get_running_loop()
    deadline = loop.time() + 3
    try:
        async with asyncio.timeout_at(deadline):
            await operasi_lambat()
    except TimeoutError:
        print("ā° Deadline terlewat!")

asyncio.run(main())

Menangani Exception di Multiple Tasks

import asyncio

async def tugas_aman(n):
    await asyncio.sleep(1)
    return f"Task {n} OK"

async def tugas_error(n):
    await asyncio.sleep(0.5)
    raise ValueError(f"Error di task {n}!")

async def main():
    # gather dengan return_exceptions=True
    hasil = await asyncio.gather(
        tugas_aman(1),
        tugas_error(2),
        tugas_aman(3),
        return_exceptions=True  # Error sebagai hasil, bukan raise
    )

    for i, h in enumerate(hasil):
        if isinstance(h, Exception):
            print(f"  āŒ Task {i+1}: {h}")
        else:
            print(f"  āœ… Task {i+1}: {h}")

    # Dengan TaskGroup (Python 3.11+)
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(tugas_aman(1))
            tg.create_task(tugas_error(2))  # Ini akan membatalkan semua
            tg.create_task(tugas_aman(3))
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"  Caught: {exc}")

asyncio.run(main())

Shield: Lindungi Task dari Pembatalan

import asyncio

async def operasi_penting():
    """Operasi yang TIDAK boleh dibatalkan"""
    print("  Menyimpan data ke database...")
    await asyncio.sleep(3)
    print("  āœ… Data tersimpan!")
    return "Tersimpan"

async def main():
    try:
        # shield melindungi inner coroutine dari CancelledError
        hasil = await asyncio.wait_for(
            asyncio.shield(operasi_penting()),
            timeout=2
        )
    except TimeoutError:
        print("ā° Timeout, tapi operasi_penting tetap berjalan!")
        # Tunggu sebentar untuk membuktikan operasi_penting selesai
        await asyncio.sleep(2)

asyncio.run(main())

9. Async HTTP & Database

Salah satu use case terbaik asyncio adalah concurrent HTTP requests dan database queries. Berikut contoh menggunakan library populer.

# Install: pip install aiohttp
import asyncio
import aiohttp

async def fetch(session, url):
    """Fetch satu URL"""
    async with session.get(url) as response:
        data = await response.json()
        return {"url": url, "status": response.status, "data": data}

async def fetch_sem(session, url, semaphore):
    """Fetch dengan semaphore untuk limit concurrent"""
    async with semaphore:
        return await fetch(session, url)

async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
        "https://jsonplaceholder.typicode.com/users/1",
        "https://jsonplaceholder.typicode.com/todos/1",
    ]

    semaphore = asyncio.Semaphore(3)  # Max 3 concurrent

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_sem(session, url, semaphore) for url in urls]
        hasil = await asyncio.gather(*tasks, return_exceptions=True)

    for h in hasil:
        if isinstance(h, Exception):
            print(f"  āŒ Error: {h}")
        else:
            print(f"  āœ… {h['url']}: {h['status']}")

asyncio.run(main())
# Async Database dengan aiosqlite
# Install: pip install aiosqlite
import asyncio
import aiosqlite

async def main():
    async with aiosqlite.connect("example.db") as db:
        # Create table
        await db.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INTEGER PRIMARY KEY,
                nama TEXT,
                email TEXT
            )
        """)

        # Insert data
        await db.execute(
            "INSERT INTO users (nama, email) VALUES (?, ?)",
            ("Budi", "budi@email.com")
        )
        await db.commit()

        # Select data
        async with db.execute("SELECT * FROM users") as cursor:
            async for row in cursor:
                print(f"  User: {row}")

asyncio.run(main())

Async Web Scraper

# Web scraper dengan concurrency control
import asyncio
import aiohttp
from dataclasses import dataclass

@dataclass
class PageResult:
    url: str
    title: str
    status: int
    size: int

async def scrape_page(session, url, semaphore):
    async with semaphore:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                html = await resp.text()
                # Simple title extraction
                start = html.find("") + 7
                end = html.find("")
                title = html[start:end] if start > 6 else "No title"
                return PageResult(url, title.strip(), resp.status, len(html))
        except Exception as e:
            return PageResult(url, f"Error: {e}", 0, 0)

async def main():
    urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
    sem = asyncio.Semaphore(5)

    async with aiohttp.ClientSession() as session:
        tasks = [scrape_page(session, url, sem) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    for r in results:
        if isinstance(r, PageResult):
            print(f"  [{r.status}] {r.url} — {r.title} ({r.size} bytes)")

asyncio.run(main())

10. Best Practices & Common Pitfalls

āœ… Yang Harus Dilakukan

Praktik Penjelasan
Gunakan async withMemastikan resource dibersihkan dengan benar
Batasi concurrencyGunakan Semaphore agar tidak overwhelm server
Set timeoutSelalu pasang timeout untuk network operations
Gunakan asyncio.run()Cara terbaik untuk memulai event loop di main
Gunakan TaskGroupLebih aman dari gather() untuk error handling
Handle CancelledErrorBersihkan resource dengan tepat saat task dibatalkan

āŒ Pitfalls yang Harus Dihindari

import asyncio

# āŒ PITFALL 1: Lupa await
async def pitfall_1():
    # Coroutine tidak berjalan tanpa await!
    result = asyncio.sleep(1)  # Hanya membuat coroutine object!
    print(result)  # <coroutine object sleep at 0x...>

# āŒ PITFALL 2: Menggunakan library blocking di async code
async def pitfall_2():
    import time
    time.sleep(5)  # MEMBLOCK seluruh event loop!
    # āœ… Gunakan: await asyncio.sleep(5)

# āŒ PITFALL 3: Menggunakan requests library
async def pitfall_3():
    import requests
    resp = requests.get("https://api.example.com")  # BLOCKING!
    # āœ… Gunakan: aiohttp atau httpx dengan async client

# āŒ PITFALL 4: Tidak handle exception di gather
async def pitfall_4():
    # Jika satu task gagal, error hanya muncul saat await
    tasks = [asyncio.sleep(1) for _ in range(10)]
    # āœ… Gunakan return_exceptions=True atau TaskGroup

# āŒ PITFALL 5: Membuat task tapi tidak referensi
async def pitfall_5():
    asyncio.create_task(some_coroutine())  # Task bisa di-GC!
    # āœ… Simpan referensi: task = asyncio.create_task(...)
    # atau gunakan TaskGroup

print("Semua pitfall dijelaskan!")
šŸ’” Cheat Sheet asyncio

asyncio.run(coro) — Jalankan coroutine di main
await asyncio.sleep(n) — Non-blocking sleep
asyncio.create_task(coro) — Buat task concurrent
asyncio.gather(*coros) — Jalankan semua bersamaan
asyncio.wait_for(coro, timeout) — Dengan timeout
asyncio.Semaphore(n) — Batasi concurrency
asyncio.Queue() — Producer-consumer queue
asyncio.TaskGroup() — Structured concurrency (3.11+)

11. Quiz Pemahaman

1. Apa yang terjadi jika kamu memanggil fungsi async def tanpa await?

2. Fungsi mana yang digunakan untuk menjalankan beberapa coroutine bersamaan?

3. Apa fungsi dari Semaphore dalam asyncio?

4. Kenapa menggunakan time.sleep() dalam kode asyncio sangat berbahaya?

5. Apa perbedaan utama antara gather() dan TaskGroup?

šŸ” Zoom
100%
šŸŽØ Tema