1. Pengenalan asyncio
asyncio adalah library standar Python untuk menulis kode yang bersifat concurrent menggunakan model single-threaded, cooperative multitasking. Berbeda dengan threading yang menggunakan preemptive multitasking, asyncio memungkinkan satu thread menangani ribuan operasi I/O secara bersamaan tanpa blocking.
Bayangkan kamu sedang memesan makanan di restoran. Dengan pendekatan synchronous, kamu menunggu pesanan selesai sebelum melakukan apapun. Dengan async, kamu pesan makanan, lalu sambil menunggu kamu bisa cek HP, ngobrol, atau pesan minuman ā begitu makanan siap, kamu langsung mengambilnya.
Kapan Menggunakan asyncio?
| Skenario | Gunakan asyncio? | Alasan |
|---|---|---|
| HTTP requests massal | ā Sangat cocok | Menunggu response = I/O bound |
| Database queries | ā Sangat cocok | Menunggu hasil query = I/O bound |
| File I/O | ā ļø Cukup | Lebih baik gunakan aiofiles |
| Kalkulasi CPU intensif | ā Tidak cocok | Gunakan multiprocessing |
| Web scraping massal | ā Sangat cocok | Ribuan request sekaligus |
| WebSocket server | ā Sangat cocok | Long-lived connections |
asyncio vs Threading vs Multiprocessing
| Aspek | asyncio | Threading | Multiprocessing |
|---|---|---|---|
| Mekanisme | Single thread, cooperative | Multi thread, preemptive | Multi process |
| Overhead | š¢ Rendah | š” Sedang | š“ Tinggi |
| Cocok untuk | I/O bound | I/O bound | CPU bound |
| GIL Impact | Tidak masalah | Dibatasi GIL | Tidak terpengaruh |
| Scalability | 10K+ tasks | 100-an threads | Sejumlah core |
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā EVENT LOOP ā ā ā ā āāāāāāāāāāāā āāāāāāāāāāāā āāāāāāāāāāāāāāāāāāāā ā ā ā Task A ā ā Task B ā ā Task C ā ā ā ā (await ā ā (await ā ā (await ā ā ā ā DB) ā ā HTTP) ā ā sleep) ā ā ā āāāāāā¬āāāāāā āāāāāā¬āāāāāā āāāāāāāāāā¬āāāāāāāāāā ā ā ā ā ā ā ā ā¼ ā¼ ā¼ ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā Selector / Callback Queue ā ā ā ā (memantau kapan operasi I/O selesai) ā ā ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā ā ā ā Operasi I/O selesai ā callback dieksekusi ā ā Operasi belum selesai ā lanjut ke task berikutnya ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
2. Coroutines & async/await
Coroutines adalah fungsi spesial yang didefinisikan dengan async def. Coroutine tidak langsung dieksekusi saat dipanggil ā ia mengembalikan sebuah coroutine object yang harus di-await atau dijalankan melalui event loop.
# Coroutine sederhana
import asyncio
async def sapa(nama: str) -> str:
"""Coroutine sederhana yang menyapa"""
print(f"Halo, {nama}!")
await asyncio.sleep(1) # Simulasi operasi async
return f"Selamat datang, {nama}"
# Menjalankan coroutine
async def main():
# Method 1: await langsung
hasil = await sapa("Budi")
print(hasil)
# Method 2: Menggunakan asyncio.run()
asyncio.run(main())
Memanggil fungsi async def tanpa await TIDAK menjalankan fungsi tersebut! Kamu hanya membuat coroutine object. Ini adalah kesalahan paling umum pemula asyncio.
# ā SALAH: Coroutine tidak berjalan
async def ambil_data():
print("Mengambil data...")
await asyncio.sleep(1)
return "data"
# Ini hanya membuat coroutine object, tidak menjalankan apapun!
coro = ambil_data() # Tidak ada output!
# ā
BENAR: Await coroutine
async def main():
hasil = await ambil_data() # Sekarang berjalan
print(hasil)
# atau jalankan dengan asyncio.run()
asyncio.run(ambil_data())
Nested Coroutines
import asyncio
import time
async def ambil_data_api(url: str) -> dict:
"""Simulasi fetch dari API"""
print(f" Fetching {url}...")
await asyncio.sleep(1) # Simulasi network delay
return {"url": url, "status": 200, "data": [1, 2, 3]}
async def proses_data(data: dict) -> str:
"""Proses data yang sudah diambil"""
print(f" Processing {data['url']}...")
await asyncio.sleep(0.5) # Simulasi processing
return f"Processed: {len(data['data'])} items"
async def pipeline(url: str) -> str:
"""Pipeline: ambil data ā proses ā return"""
data = await ambil_data_api(url) # Step 1
hasil = await proses_data(data) # Step 2
return hasil
async def main():
mulai = time.time()
hasil = await pipeline("https://api.example.com/users")
selesai = time.time()
print(f"Hasil: {hasil}")
print(f"Waktu: {selesai - mulai:.2f}s")
asyncio.run(main())
Coroutine Lifecycle
| Status | Penjelasan |
|---|---|
PENDING | Coroutine belum selesai |
RUNNING | Sedang dieksekusi |
DONE | Selesai (sukses atau error) |
CANCELLED | Dibatalkan |
3. Event Loop
Event loop adalah jantung dari asyncio ā sebuah infinite loop yang menjadwalkan dan mengeksekusi coroutines, callbacks, dan I/O operations. Event loop memantau semua task dan menjalankan yang sudah siap.
import asyncio
async def tugas(nama, durasi):
print(f"[{nama}] Mulai, durasi {durasi}s")
await asyncio.sleep(durasi)
print(f"[{nama}] Selesai!")
return f"Hasil dari {nama}"
async def main():
# Mendapatkan event loop saat ini
loop = asyncio.get_running_loop()
print(f"Event loop type: {type(loop)}")
# Menjalankan beberapa task sekaligus
task_a = loop.create_task(tugas("A", 2))
task_b = loop.create_task(tugas("B", 1))
task_c = loop.create_task(tugas("C", 3))
# Menunggu semua selesai
hasil = await asyncio.gather(task_a, task_b, task_c)
print(f"Semua hasil: {hasil}")
asyncio.run(main())
Jalankan Blocking Code di Event Loop
import asyncio
import time
def operasi_blocking():
"""Fungsi biasa yang blocking (tidak async)"""
time.sleep(3)
return "Selesai dari blocking"
async def main():
loop = asyncio.get_running_loop()
# Jalankan fungsi blocking di executor (thread pool)
# Agar event loop tidak ter-block
hasil = await loop.run_in_executor(None, operasi_blocking)
print(hasil)
# Dengan custom executor
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=4) as executor:
futures = [
loop.run_in_executor(executor, operasi_blocking)
for _ in range(4)
]
hasil = await asyncio.gather(*futures)
print(f"Semua selesai: {hasil}")
asyncio.run(main())
Gunakan run_in_executor() untuk menjalankan library synchronous (seperti requests atau time.sleep) tanpa mem-block event loop. Default executor menggunakan ThreadPoolExecutor.
4. Tasks & Futures
Task adalah wrapper untuk coroutine yang menjadwalkan eksekusinya di event loop. Task memungkinkan beberapa coroutine berjalan secara concurrent. Future adalah objek yang mewakili hasil yang belum tersedia ā Task adalah subclass dari Future.
import asyncio
async def hitung_faktorial(n: int) -> int:
"""Hitung faktorial secara async dengan delay kecil"""
hasil = 1
for i in range(1, n + 1):
hasil *= i
if i % 5 == 0:
await asyncio.sleep(0) # Yield control ke event loop
return hasil
async def main():
# Membuat tasks dari coroutines
tasks = []
for i in range(5, 15):
task = asyncio.create_task(hitung_faktorial(i), name=f"fact-{i}")
tasks.append(task)
print(f"Dibuat {len(tasks)} tasks")
# Menunggu semua task selesai
hasil = await asyncio.gather(*tasks)
for i, h in enumerate(hasil):
print(f" {i+5}! = {h}")
asyncio.run(main())
Melihat Status Task
import asyncio
async def tugas_lama(nama, durasi):
await asyncio.sleep(durasi)
return f"{nama} selesai"
async def main():
task = asyncio.create_task(tugas_lama("API Call", 5), name="api-task")
# Cek status sebelum selesai
print(f"Task name: {task.get_name()}")
print(f"Done? {task.done()}") # False
print(f"Cancelled? {task.cancelled()}") # False
# Tunggu selesai
hasil = await task
print(f"Done? {task.done()}") # True
print(f"Hasil: {hasil}")
# Akses exception jika ada
print(f"Exception: {task.exception()}") # None
asyncio.run(main())
Membatalkan Task
import asyncio
async def tugas_panjang():
try:
print("Tugas panjang mulai...")
await asyncio.sleep(60) # Tidur 60 detik
return "Selesai"
except asyncio.CancelledError:
print("Tugas dibatalkan! Membersihkan...")
await asyncio.sleep(0.1) # Cleanup
raise # Re-raise agar status CANCELLED
async def main():
task = asyncio.create_task(tugas_panjang())
# Batalkan setelah 2 detik
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print(f"Task dibatalkan: {task.cancelled()}")
asyncio.run(main())
Futures
import asyncio
async def main():
loop = asyncio.get_running_loop()
# Buat future
future = loop.create_future()
# Simulasi callback yang men-set nilai future
async def set_future_value():
await asyncio.sleep(2)
future.set_result(42) # Set hasil
print("Future value set!")
# Jalankan setter dan tunggu future
asyncio.create_task(set_future_value())
hasil = await future # Tunggu sampai value di-set
print(f"Future result: {hasil}")
asyncio.run(main())
5. asyncio.gather & Konkurensi
asyncio.gather() adalah cara paling umum untuk menjalankan beberapa coroutine secara concurrent dan menunggu semuanya selesai. Ini sangat powerful untuk operasi I/O yang bisa dilakukan paralel.
import asyncio
import time
async def fetch_api(nama: str, delay: float) -> str:
"""Simulasi API call"""
print(f"[{nama}] Memulai request...")
await asyncio.sleep(delay)
print(f"[{nama}] Selesai dalam {delay}s")
return f"Response dari {nama}"
async def main():
mulai = time.time()
# ā SEQUENTIAL: Total 6 detik
# a = await fetch_api("API-1", 2)
# b = await fetch_api("API-2", 2)
# c = await fetch_api("API-3", 2)
# ā
CONCURRENT: Total 2 detik!
a, b, c = await asyncio.gather(
fetch_api("API-1", 2),
fetch_api("API-2", 2),
fetch_api("API-3", 2),
)
selesai = time.time()
print(f"\nSemua selesai dalam {selesai - mulai:.2f}s")
print(f"Hasil: {a}, {b}, {c}")
asyncio.run(main())
TaskGroup (Python 3.11+)
import asyncio
async def ambil_data(url: str) -> dict:
await asyncio.sleep(1)
return {"url": url, "data": "ok"}
async def main():
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
"https://api.example.com/albums",
]
# TaskGroup: Lebih aman, batalkan semua jika ada error
async with asyncio.TaskGroup() as tg:
tasks = [tg.create_task(ambil_data(url)) for url in urls]
# Semua task sudah selesai di sini
for task in tasks:
print(task.result())
asyncio.run(main())
gather(): Jika satu task gagal, task lain tetap berjalan. TaskGroup: Jika satu task gagal, semua task lain dibatalkan. Gunakan TaskGroup untuk perilaku yang lebih aman.
as_completed: Proses Hasil Seiring Tersedia
import asyncio
async def tugas(nama, durasi):
await asyncio.sleep(durasi)
return f"{nama} selesai dalam {durasi}s"
async def main():
tasks = [
tugas("Cepat", 1),
tugas("Sedang", 3),
tugas("Lambat", 5),
]
# Proses seiring selesai (tidak perlu tunggu semua)
for coro in asyncio.as_completed(tasks):
hasil = await coro
print(f"ā {hasil}") # Yang cepat diproses duluan
asyncio.run(main())
6. Semaphore & Throttling
Semaphore membatasi jumlah task yang bisa berjalan secara concurrent. Ini sangat penting untuk menghindari overwhelming API server, membatasi koneksi database, atau mencegah resource exhaustion.
import asyncio
import time
async def akses_api(id: int, semaphore: asyncio.Semaphore):
async with semaphore: # Batasi concurrent access
print(f"[{id}] Mulai akses API...")
await asyncio.sleep(1) # Simulasi API call
print(f"[{id}] Selesai!")
return f"Data-{id}"
async def main():
# Batasi hanya 3 request bersamaan
semaphore = asyncio.Semaphore(3)
mulai = time.time()
# Buat 10 tasks, tapi hanya 3 yang jalan sekaligus
tasks = [akses_api(i, semaphore) for i in range(10)]
hasil = await asyncio.gather(*tasks)
selesai = time.time()
print(f"\n10 requests selesai dalam {selesai - mulai:.2f}s")
# Seharusnya ~4 detik (3 batch Ć 1 detik + 1 batch)
print(f"Semua data: {len(hasil)} item")
asyncio.run(main())
BoundedSemaphore untuk Keamanan Ekstra
import asyncio
async def main():
# BoundedSemaphore tidak bisa di-release melebihi nilai awal
sem = asyncio.BoundedSemaphore(2)
async def tugas(n):
async with sem:
print(f" Task {n} masuk")
await asyncio.sleep(1)
print(f" Task {n} keluar")
await asyncio.gather(*[tugas(i) for i in range(5)])
asyncio.run(main())
Rate Limiter Custom
import asyncio
import time
class RateLimiter:
"""Rate limiter: max N request per detik"""
def __init__(self, max_rate: int):
self.semaphore = asyncio.Semaphore(max_rate)
self.max_rate = max_rate
async def acquire(self):
await self.semaphore.acquire()
async def release(self):
await asyncio.sleep(1.0 / self.max_rate)
self.semaphore.release()
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, *args):
await self.release()
async def fetch_with_rate_limit(url: str, limiter: RateLimiter):
async with limiter:
print(f"[{time.time():.1f}] Fetching {url}")
await asyncio.sleep(0.1)
return f"Data dari {url}"
async def main():
limiter = RateLimiter(5) # Max 5 per detik
urls = [f"https://api.example.com/page/{i}" for i in range(20)]
tasks = [fetch_with_rate_limit(u, limiter) for u in urls]
hasil = await asyncio.gather(*tasks)
print(f"Selesai: {len(hasil)} requests")
asyncio.run(main())
7. Queues & Producer-Consumer
asyncio.Queue memungkinkan pattern producer-consumer dimana satu sisi memproduksi data dan sisi lain mengkonsumsinya ā cocok untuk pipeline processing, task workers, dan data streaming.
import asyncio
import random
async def producer(queue: asyncio.Queue, nama: str, jumlah: int):
"""Menghasilkan item dan memasukkannya ke queue"""
for i in range(jumlah):
item = f"{nama}-item-{i}"
await asyncio.sleep(random.uniform(0.1, 0.5))
await queue.put(item)
print(f" š¦ [{nama}] Produksi: {item}")
print(f" ā
[{nama}] Selesai memproduksi")
async def consumer(queue: asyncio.Queue, nama: str):
"""Mengambil dan memproses item dari queue"""
while True:
item = await queue.get()
if item is None: # Poison pill = stop signal
break
print(f" āļø [{nama}] Proses: {item}")
await asyncio.sleep(random.uniform(0.2, 0.6))
queue.task_done()
print(f" ā
[{nama}] Consumer selesai")
async def main():
queue = asyncio.Queue(maxsize=5) # Buffer size 5
# 2 producers, 3 consumers
producers = [
asyncio.create_task(producer(queue, "P1", 5)),
asyncio.create_task(producer(queue, "P2", 5)),
]
consumers = [
asyncio.create_task(consumer(queue, f"C{i}"))
for i in range(3)
]
# Tunggu producers selesai
await asyncio.gather(*producers)
print("\nSemua producer selesai, menunggu queue kosong...")
# Tunggu semua item diproses
await queue.join()
# Kirim poison pills untuk stop consumers
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
print("\nā
Pipeline selesai!")
asyncio.run(main())
Tipe Queue Lainnya
import asyncio
async def main():
# 1. LIFO Queue: Last In First Out
lifo = asyncio.LifoQueue()
await lifo.put("pertama")
await lifo.put("kedua")
await lifo.put("ketiga")
print(await lifo.get()) # "ketiga" (yang terakhir masuk)
# 2. Priority Queue: Item dengan prioritas terendah duluan
pq = asyncio.PriorityQueue()
await pq.put((3, "Prioritas rendah"))
await pq.put((1, "Prioritas tinggi"))
await pq.put((2, "Prioritas sedang"))
while not pq.empty():
prio, item = await pq.get()
print(f" [{prio}] {item}")
# Output: Prioritas tinggi, sedang, rendah
asyncio.run(main())
8. Timeout & Error Handling
Timeout sangat penting dalam async programming untuk mencegah task yang tidak pernah selesai (hang). asyncio menyediakan beberapa mekanisme timeout yang robust.
import asyncio
async def operasi_lambat():
"""Operasi yang bisa sangat lambat"""
await asyncio.sleep(100) # Akan timeout sebelum selesai
return "data"
async def main():
# Method 1: asyncio.timeout (Python 3.11+)
try:
async with asyncio.timeout(3):
await operasi_lambat()
except TimeoutError:
print("ā° Timeout! Operasi terlalu lambat")
# Method 2: asyncio.wait_for
try:
hasil = await asyncio.wait_for(operasi_lambat(), timeout=3)
except TimeoutError:
print("ā° Timeout via wait_for!")
# Method 3: asyncio.timeout_at (deadline-based)
loop = asyncio.get_running_loop()
deadline = loop.time() + 3
try:
async with asyncio.timeout_at(deadline):
await operasi_lambat()
except TimeoutError:
print("ā° Deadline terlewat!")
asyncio.run(main())
Menangani Exception di Multiple Tasks
import asyncio
async def tugas_aman(n):
await asyncio.sleep(1)
return f"Task {n} OK"
async def tugas_error(n):
await asyncio.sleep(0.5)
raise ValueError(f"Error di task {n}!")
async def main():
# gather dengan return_exceptions=True
hasil = await asyncio.gather(
tugas_aman(1),
tugas_error(2),
tugas_aman(3),
return_exceptions=True # Error sebagai hasil, bukan raise
)
for i, h in enumerate(hasil):
if isinstance(h, Exception):
print(f" ā Task {i+1}: {h}")
else:
print(f" ā
Task {i+1}: {h}")
# Dengan TaskGroup (Python 3.11+)
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(tugas_aman(1))
tg.create_task(tugas_error(2)) # Ini akan membatalkan semua
tg.create_task(tugas_aman(3))
except* ValueError as eg:
for exc in eg.exceptions:
print(f" Caught: {exc}")
asyncio.run(main())
Shield: Lindungi Task dari Pembatalan
import asyncio
async def operasi_penting():
"""Operasi yang TIDAK boleh dibatalkan"""
print(" Menyimpan data ke database...")
await asyncio.sleep(3)
print(" ā
Data tersimpan!")
return "Tersimpan"
async def main():
try:
# shield melindungi inner coroutine dari CancelledError
hasil = await asyncio.wait_for(
asyncio.shield(operasi_penting()),
timeout=2
)
except TimeoutError:
print("ā° Timeout, tapi operasi_penting tetap berjalan!")
# Tunggu sebentar untuk membuktikan operasi_penting selesai
await asyncio.sleep(2)
asyncio.run(main())
9. Async HTTP & Database
Salah satu use case terbaik asyncio adalah concurrent HTTP requests dan database queries. Berikut contoh menggunakan library populer.
# Install: pip install aiohttp
import asyncio
import aiohttp
async def fetch(session, url):
"""Fetch satu URL"""
async with session.get(url) as response:
data = await response.json()
return {"url": url, "status": response.status, "data": data}
async def fetch_sem(session, url, semaphore):
"""Fetch dengan semaphore untuk limit concurrent"""
async with semaphore:
return await fetch(session, url)
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
"https://jsonplaceholder.typicode.com/users/1",
"https://jsonplaceholder.typicode.com/todos/1",
]
semaphore = asyncio.Semaphore(3) # Max 3 concurrent
async with aiohttp.ClientSession() as session:
tasks = [fetch_sem(session, url, semaphore) for url in urls]
hasil = await asyncio.gather(*tasks, return_exceptions=True)
for h in hasil:
if isinstance(h, Exception):
print(f" ā Error: {h}")
else:
print(f" ā
{h['url']}: {h['status']}")
asyncio.run(main())
# Async Database dengan aiosqlite
# Install: pip install aiosqlite
import asyncio
import aiosqlite
async def main():
async with aiosqlite.connect("example.db") as db:
# Create table
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
nama TEXT,
email TEXT
)
""")
# Insert data
await db.execute(
"INSERT INTO users (nama, email) VALUES (?, ?)",
("Budi", "budi@email.com")
)
await db.commit()
# Select data
async with db.execute("SELECT * FROM users") as cursor:
async for row in cursor:
print(f" User: {row}")
asyncio.run(main())
Async Web Scraper
# Web scraper dengan concurrency control
import asyncio
import aiohttp
from dataclasses import dataclass
@dataclass
class PageResult:
url: str
title: str
status: int
size: int
async def scrape_page(session, url, semaphore):
async with semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
html = await resp.text()
# Simple title extraction
start = html.find("") + 7
end = html.find(" ")
title = html[start:end] if start > 6 else "No title"
return PageResult(url, title.strip(), resp.status, len(html))
except Exception as e:
return PageResult(url, f"Error: {e}", 0, 0)
async def main():
urls = [f"https://httpbin.org/delay/{i%3}" for i in range(10)]
sem = asyncio.Semaphore(5)
async with aiohttp.ClientSession() as session:
tasks = [scrape_page(session, url, sem) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
if isinstance(r, PageResult):
print(f" [{r.status}] {r.url} ā {r.title} ({r.size} bytes)")
asyncio.run(main())
10. Best Practices & Common Pitfalls
ā Yang Harus Dilakukan
| Praktik | Penjelasan |
|---|---|
Gunakan async with | Memastikan resource dibersihkan dengan benar |
| Batasi concurrency | Gunakan Semaphore agar tidak overwhelm server |
| Set timeout | Selalu pasang timeout untuk network operations |
Gunakan asyncio.run() | Cara terbaik untuk memulai event loop di main |
Gunakan TaskGroup | Lebih aman dari gather() untuk error handling |
Handle CancelledError | Bersihkan resource dengan tepat saat task dibatalkan |
ā Pitfalls yang Harus Dihindari
import asyncio
# ā PITFALL 1: Lupa await
async def pitfall_1():
# Coroutine tidak berjalan tanpa await!
result = asyncio.sleep(1) # Hanya membuat coroutine object!
print(result) # <coroutine object sleep at 0x...>
# ā PITFALL 2: Menggunakan library blocking di async code
async def pitfall_2():
import time
time.sleep(5) # MEMBLOCK seluruh event loop!
# ā
Gunakan: await asyncio.sleep(5)
# ā PITFALL 3: Menggunakan requests library
async def pitfall_3():
import requests
resp = requests.get("https://api.example.com") # BLOCKING!
# ā
Gunakan: aiohttp atau httpx dengan async client
# ā PITFALL 4: Tidak handle exception di gather
async def pitfall_4():
# Jika satu task gagal, error hanya muncul saat await
tasks = [asyncio.sleep(1) for _ in range(10)]
# ā
Gunakan return_exceptions=True atau TaskGroup
# ā PITFALL 5: Membuat task tapi tidak referensi
async def pitfall_5():
asyncio.create_task(some_coroutine()) # Task bisa di-GC!
# ā
Simpan referensi: task = asyncio.create_task(...)
# atau gunakan TaskGroup
print("Semua pitfall dijelaskan!")
asyncio.run(coro) ā Jalankan coroutine di main
await asyncio.sleep(n) ā Non-blocking sleep
asyncio.create_task(coro) ā Buat task concurrent
asyncio.gather(*coros) ā Jalankan semua bersamaan
asyncio.wait_for(coro, timeout) ā Dengan timeout
asyncio.Semaphore(n) ā Batasi concurrency
asyncio.Queue() ā Producer-consumer queue
asyncio.TaskGroup() ā Structured concurrency (3.11+)
11. Quiz Pemahaman
1. Apa yang terjadi jika kamu memanggil fungsi async def tanpa await?
2. Fungsi mana yang digunakan untuk menjalankan beberapa coroutine bersamaan?
3. Apa fungsi dari Semaphore dalam asyncio?
4. Kenapa menggunakan time.sleep() dalam kode asyncio sangat berbahaya?
5. Apa perbedaan utama antara gather() dan TaskGroup?