1. Pengenalan Concurrency vs Parallelism
Dalam pemrograman modern, kemampuan untuk menjalankan beberapa tugas secara bersamaan sangat penting untuk meningkatkan performa aplikasi. Python menyediakan beberapa modul untuk menangani hal ini: threading, multiprocessing, dan asyncio.
Sebelum masuk ke kode, penting untuk memahami perbedaan antara concurrency dan parallelism.
Concurrency vs Parallelism
| Aspek | Concurrency | Parallelism |
|---|---|---|
| Definisi | Banyak tugas yang bergantian dieksekusi | Banyak tugas yang benar-benar dieksekusi bersamaan |
| CPU Core | Bisa 1 core saja | Memerlukan multi-core |
| Cocok untuk | I/O-bound tasks (network, file) | CPU-bound tasks (kalkulasi, processing) |
| Modul Python | threading, asyncio | multiprocessing |
| Overhead | Rendah | Lebih tinggi (memory per process) |
āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā ā CONCURRENCY ā ā ā ā Thread 1: āāāāāāāāāāāāāāāāāāāāāāāā ā ā Thread 2: āāāāāāāāāāāāāāāāāāāāāāāā ā ā (bergantian pada 1 core) ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā⤠ā PARALLELISM ā ā ā ā Core 1: āāāāāāāāāāāāāāāāāāāāāāāā ā ā Core 2: āāāāāāāāāāāāāāāāāāāāāāāā ā ā Core 3: āāāāāāāāāāāāāāāāāāāāāāāā ā ā (benar-benar bersamaan) ā āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā
Kapan Menggunakan Apa?
# ============================ # I/O-BOUND ā gunakan threading # ============================ # - Download file dari internet # - Membaca/menulis file # - Database queries # - API calls # - Web scraping # ============================ # CPU-BOUND ā gunakan multiprocessing # ============================ # - Kalkulasi matematika berat # - Image/video processing # - Data processing besar # - Machine learning training # - Compression/decompression # ============================ # Banyak I/O + skala besar ā gunakan asyncio # ============================ # - Web server # - WebSocket connections # - Streaming data # - Bot dengan banyak koneksi
2. Threading Dasar
Modul threading memungkinkan kita menjalankan beberapa fungsi secara concurrent dalam satu proses. Thread berbagi memory space yang sama, sehingga komunikasi antar thread lebih mudah tetapi juga berisiko race condition.
Membuat Thread Pertama
import threading
import time
def download_file(name: str, duration: int):
"""Simulasi download file."""
print(f"š„ Mulai download {name}...")
time.sleep(duration) # Simulasi download
print(f"ā
{name} selesai! ({duration} detik)")
# ===== Sequential (satu per satu) =====
print("=== Sequential ===")
start = time.time()
download_file("file_a.zip", 2)
download_file("file_b.zip", 3)
download_file("file_c.zip", 1)
print(f"Total: {time.time() - start:.1f} detik\n")
# Total: ~6 detik
# ===== Concurrent dengan Thread =====
print("=== Concurrent ===")
start = time.time()
t1 = threading.Thread(target=download_file, args=("file_a.zip", 2))
t2 = threading.Thread(target=download_file, args=("file_b.zip", 3))
t3 = threading.Thread(target=download_file, args=("file_c.zip", 1))
t1.start()
t2.start()
t3.start()
t1.join() # Tunggu t1 selesai
t2.join() # Tunggu t2 selesai
t3.join() # Tunggu t3 selesai
print(f"Total: {time.time() - start:.1f} detik")
# Total: ~3 detik (waktu thread terlama)
Thread dengan kwargs dan daemon
import threading
import time
def worker(task_id: int, delay: float):
"""Worker thread."""
print(f" Task {task_id} mulai (thread: {threading.current_thread().name})")
time.sleep(delay)
print(f" Task {task_id} selesai")
# Menggunakan kwargs
t = threading.Thread(
target=worker,
args=(1,),
kwargs={"delay": 2.0},
name="MyWorker",
daemon=False # Default: non-daemon
)
t.start()
print(f"Thread name: {t.name}")
print(f"Thread alive: {t.is_alive()}")
print(f"Thread daemon: {t.daemon}")
t.join()
# Daemon thread: berhenti ketika program utama berhenti
def background_logger():
while True:
print(f" [Logger] Heartbeat ā {time.ctime()}")
time.sleep(5)
daemon = threading.Thread(target=background_logger, daemon=True)
daemon.start()
print("Main program berjalan...")
time.sleep(12)
print("Main program selesai ā daemon akan berhenti otomatis")
# Multiple threads dengan loop
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, 1.5))
threads.append(t)
t.start()
# Tunggu semua selesai
for t in threads:
t.join()
print("Semua task selesai!")
3. Thread Class dan Custom Thread
Cara terbaik untuk membuat thread adalah dengan meng-subclass threading.Thread dan override method run().
import threading
import time
from typing import List
class DownloadThread(threading.Thread):
"""Thread untuk simulasi download."""
def __init__(self, url: str, filename: str):
super().__init__(name=f"Download-{filename}")
self.url = url
self.filename = filename
self.result = None
self.error = None
def run(self):
"""Method yang dijalankan saat thread.start() dipanggil."""
try:
print(f"š„ [{self.name}] Download dari {self.url}")
time.sleep(2) # Simulasi download
self.result = f"Data dari {self.url}"
print(f"ā
[{self.name}] Selesai: {self.filename}")
except Exception as e:
self.error = e
print(f"ā [{self.name}] Error: {e}")
class ProcessingThread(threading.Thread):
"""Thread untuk memproses data."""
def __init__(self, data: list, result_list: list, index: int):
super().__init__(name=f"Process-{index}")
self.data = data
self.result_list = result_list
self.index = index
def run(self):
# Simulasi processing
result = sum(x ** 2 for x in self.data)
self.result_list[self.index] = result
print(f" [{self.name}] Result: {result}")
# Menggunakan custom thread
print("=== Custom Thread ===")
urls = [
("https://api.example.com/data1", "data1.json"),
("https://api.example.com/data2", "data2.json"),
("https://api.example.com/data3", "data3.json"),
]
threads = []
for url, filename in urls:
t = DownloadThread(url, filename)
threads.append(t)
t.start()
for t in threads:
t.join()
if t.result:
print(f" Hasil: {t.result}")
# Processing paralel dengan shared result
print("\n=== Parallel Processing ===")
data_chunks = [
list(range(1, 10001)),
list(range(10001, 20001)),
list(range(20001, 30001)),
]
results = [None] * len(data_chunks)
workers = []
for i, chunk in enumerate(data_chunks):
t = ProcessingThread(chunk, results, i)
workers.append(t)
t.start()
for t in workers:
t.join()
total = sum(results)
print(f"Total: {total:,}")
Thread-local Data
import threading
# Thread-local storage: data khusus per thread
local_data = threading.local()
def process_request(request_id: int):
# Set data khusus untuk thread ini
local_data.request_id = request_id
local_data.start_time = time.time()
# Semua fungsi yang dipanggil dari thread ini bisa akses local_data
do_work()
def do_work():
# Akses data thread-local
rid = local_data.request_id
elapsed = time.time() - local_data.start_time
print(f" Request {rid} diproses oleh {threading.current_thread().name} "
f"({elapsed:.3f}s)")
import time
# Jalankan beberapa request di thread berbeda
threads = []
for i in range(5):
t = threading.Thread(target=process_request, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
4. Synchronization Primitives
Ketika beberapa thread mengakses resource yang sama, kita perlu mekanisme sinkronisasi untuk mencegah race condition.
Lock (Mutex)
import threading
import time
# ā TANPA LOCK: Race Condition
counter_unsafe = 0
def increment_unsafe(n: int):
global counter_unsafe
for _ in range(n):
counter_unsafe += 1 # Tidak aman!
threads = []
for _ in range(10):
t = threading.Thread(target=increment_unsafe, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Tanpa lock: {counter_unsafe}") # Bisa kurang dari 1_000_000!
# ā
DENGAN LOCK: Thread-safe
counter_safe = 0
lock = threading.Lock()
def increment_safe(n: int):
global counter_safe
for _ in range(n):
with lock: # Lock otomatis di-release saat keluar
counter_safe += 1
threads = []
for _ in range(10):
t = threading.Thread(target=increment_safe, args=(100_000,))
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Dengan lock: {counter_safe}") # Selalu 1_000_000
# Lock dengan try/finally
lock2 = threading.Lock()
def safe_operation():
lock2.acquire()
try:
# Kode kritis
print(" Melakukan operasi yang dilindungi...")
finally:
lock2.release() # Pastikan selalu di-release
# RLock: Reentrant Lock (bisa di-lock berkali-kali dari thread yang sama)
rlock = threading.RLock()
def recursive_lock_example(depth: int):
with rlock:
if depth > 0:
print(f" Depth: {depth}")
recursive_lock_example(depth - 1)
recursive_lock_example(5)
Event dan Condition
import threading
import time
# ===== Event: sinyal antar thread =====
ready_event = threading.Event()
def producer():
print("š Producer: Menyiapkan data...")
time.sleep(3) # Simulasi persiapan
print("š Producer: Data siap!")
ready_event.set() # Sinyal bahwa data siap
def consumer(name: str):
print(f" {name}: Menunggu data...")
ready_event.wait() # Block sampai event di-set
print(f" {name}: Data diterima, mulai proses!")
# Jalankan
t_producer = threading.Thread(target=producer)
t_consumers = [threading.Thread(target=consumer, args=(f"Consumer-{i}",)) for i in range(3)]
t_producer.start()
for t in t_consumers:
t.start()
t_producer.join()
for t in t_consumers:
t.join()
# ===== Condition: wait/notify =====
buffer = []
condition = threading.Condition()
def producer_cond():
for i in range(5):
time.sleep(1)
with condition:
buffer.append(i)
print(f" Produced: {i}")
condition.notify() # Notify waiting consumer
def consumer_cond():
while True:
with condition:
while not buffer:
condition.wait() # Wait for notification
item = buffer.pop(0)
print(f" Consumed: {item}")
if item == 4:
return
t1 = threading.Thread(target=producer_cond)
t2 = threading.Thread(target=consumer_cond)
t1.start()
t2.start()
t1.join()
t2.join()
Semaphore dan Barrier
import threading
import time
# ===== Semaphore: Membatasi jumlah concurrent access =====
# Contoh: Database pool dengan max 3 koneksi
db_semaphore = threading.Semaphore(3) # Max 3 concurrent
def db_query(query_id: int):
print(f" Query {query_id} menunggu koneksi...")
with db_semaphore:
print(f" Query {query_id} mendapat koneksi")
time.sleep(2) # Simulasi query
print(f" Query {query_id} selesai")
threads = [threading.Thread(target=db_query, args=(i,)) for i in range(8)]
for t in threads:
t.start()
for t in threads:
t.join()
# ===== Barrier: Semua thread menunggu sampai semua siap =====
barrier = threading.Barrier(3) # 3 thread harus kumpul
def worker_barrier(worker_id: int):
print(f" Worker {worker_id}: Menyiapkan...")
time.sleep(worker_id) # Waktu persiapan berbeda
print(f" Worker {worker_id}: Siap, menunggu yang lain...")
barrier.wait() # Block sampai semua 3 thread siap
print(f" Worker {worker_id}: Mulai eksekusi bersama!")
threads = [threading.Thread(target=worker_barrier, args=(i,)) for i in range(3)]
for t in threads:
t.start()
for t in threads:
t.join()
5. Memahami GIL (Global Interpreter Lock)
GIL adalah mekanisme di CPython yang memastikan hanya satu thread yang mengeksekusi bytecode Python pada satu waktu. Ini adalah alasan utama mengapa threading tidak efektif untuk CPU-bound tasks.
Dampak GIL
import threading
import time
import multiprocessing
def cpu_bound(n: int) -> int:
"""CPU-intensive task."""
total = 0
for i in range(n):
total += i * i
return total
# === Test 1: Sequential ===
start = time.time()
r1 = cpu_bound(10_000_000)
r2 = cpu_bound(10_000_000)
seq_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
# === Test 2: Threading (CPU-bound) ===
start = time.time()
t1 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
thread_time = time.time() - start
print(f"Threading: {thread_time:.2f}s") # ~sama dengan sequential!
# === Test 3: Multiprocessing (CPU-bound) ===
start = time.time()
p1 = multiprocessing.Process(target=cpu_bound, args=(10_000_000,))
p2 = multiprocessing.Process(target=cpu_bound, args=(10_000_000,))
p1.start(); p2.start()
p1.join(); p2.join()
mp_time = time.time() - start
print(f"Multiproc: {mp_time:.2f}s") # ~50% lebih cepat!
# Hasil di multi-core machine:
# Sequential: 6.50s
# Threading: 6.48s ā GIL mencegah parallelism!
# Multiproc: 3.82s ā Benar-benar parallel
CPU-bound: GIL membuat threading hampir tidak berguna. Gunakan multiprocessing.
I/O-bound: GIL di-release saat I/O operations, jadi threading tetap efektif.
Python 3.13+: Experimental free-threaded build (no-GIL) tersedia sebagai opt-in.
6. Multiprocessing Dasar
Modul multiprocessing menggunakan process terpisah, masing-masing dengan Python interpreter sendiri, sehingga bisa benar-benar parallel.
import multiprocessing
import time
import os
def worker(task_id: int):
"""Worker yang dijalankan di process terpisah."""
pid = os.getpid()
print(f" Task {task_id} di PID {pid}")
time.sleep(1)
return task_id * task_id
if __name__ == "__main__":
print(f"Main process PID: {os.getpid()}")
# Membuat process
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("Semua process selesai!")
# Menggunakan return value dengan Queue
from multiprocessing import Queue
def worker_with_queue(task_id: int, queue: Queue):
result = task_id ** 2
queue.put((task_id, result))
queue = Queue()
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker_with_queue, args=(i, queue))
processes.append(p)
p.start()
for p in processes:
p.join()
# Kumpulkan hasil
results = {}
while not queue.empty():
task_id, result = queue.get()
results[task_id] = result
print(f"Results: {results}")
Menggunakan Value dan Array
import multiprocessing
from multiprocessing import Value, Array
import ctypes
def increment_counter(counter: Value, lock: multiprocessing.Lock, n: int):
for _ in range(n):
with lock:
counter.value += 1
def process_array(arr: Array, start: int, end: int):
for i in range(start, end):
arr[i] = i * i
if __name__ == "__main__":
# Shared counter
counter = Value('i', 0) # 'i' = integer
lock = multiprocessing.Lock()
processes = []
for _ in range(4):
p = multiprocessing.Process(
target=increment_counter,
args=(counter, lock, 250_000)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # 1_000_000
# Shared array
arr = Array('i', 10) # Array of 10 integers
processes = []
chunk_size = 5
for i in range(2):
p = multiprocessing.Process(
target=process_array,
args=(arr, i * chunk_size, (i + 1) * chunk_size)
)
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Array: {list(arr)}") # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
7. Process Pool dan Executor
ProcessPoolExecutor adalah cara modern dan recommended untuk menggunakan multiprocessing. Pool mengelola kumpulan worker process dan mendistribusikan tugas secara otomatis.
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math
def is_prime(n: int) -> bool:
"""Mengecek apakah bilangan prima."""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def count_primes_in_range(start: int, end: int) -> int:
"""Menghitung jumlah prima dalam range."""
return sum(1 for n in range(start, end) if is_prime(n))
if __name__ == "__main__":
numbers = list(range(1, 1_000_001))
# Sequential
start = time.time()
primes_seq = count_primes_in_range(1, 1_000_001)
seq_time = time.time() - start
print(f"Sequential: {primes_seq} primes in {seq_time:.2f}s")
# ProcessPoolExecutor
start = time.time()
chunk_size = 100_000
ranges = [
(i, min(i + chunk_size, 1_000_001))
for i in range(1, 1_000_001, chunk_size)
]
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(count_primes_in_range, s, e): (s, e)
for s, e in ranges
}
primes_par = 0
for future in as_completed(futures):
primes_par += future.result()
par_time = time.time() - start
print(f"Parallel: {primes_par} primes in {par_time:.2f}s")
print(f"Speedup: {seq_time / par_time:.2f}x")
Pool.map dan Pool.starmap
import multiprocessing
import time
def process_item(item):
"""Simulasi processing satu item."""
time.sleep(0.5)
return item ** 2
def process_pair(a, b):
"""Function dengan multiple arguments."""
time.sleep(0.3)
return a + b
if __name__ == "__main__":
items = list(range(20))
# pool.map ā sederhana dan mudah
with multiprocessing.Pool(4) as pool:
results = pool.map(process_item, items)
print(f"map results: {results}")
# pool.map dengan chunksize
with multiprocessing.Pool(4) as pool:
results = pool.map(process_item, items, chunksize=5)
print(f"map chunked: {results}")
# pool.starmap ā untuk function dengan multiple args
pairs = [(1, 2), (3, 4), (5, 6), (7, 8)]
with multiprocessing.Pool(4) as pool:
results = pool.starmap(process_pair, pairs)
print(f"starmap: {results}")
# pool.imap ā lazy iterator (hemat memory)
with multiprocessing.Pool(4) as pool:
for result in pool.imap(process_item, items):
print(f" Got: {result}")
# pool.apply_async ā non-blocking
with multiprocessing.Pool(4) as pool:
async_results = [pool.apply_async(process_item, (x,)) for x in items]
results = [r.get(timeout=10) for r in async_results]
print(f"async: {results}")
8. Thread Pool Executor
ThreadPoolExecutor cocok untuk I/O-bound tasks seperti HTTP requests, file operations, dan database queries.
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request
from typing import List, Dict
def fetch_url(url: str) -> Dict:
"""Simulasi fetch URL."""
time.sleep(1) # Simulasi network delay
return {"url": url, "status": 200, "size": len(url) * 1000}
if __name__ == "__main__":
urls = [
"https://api.example.com/users",
"https://api.example.com/posts",
"https://api.example.com/comments",
"https://api.example.com/albums",
"https://api.example.com/photos",
"https://api.example.com/todos",
]
# ===== Sequential =====
start = time.time()
results_seq = [fetch_url(url) for url in urls]
print(f"Sequential: {time.time() - start:.2f}s")
# ===== ThreadPoolExecutor =====
start = time.time()
with ThreadPoolExecutor(max_workers=6) as executor:
futures = {executor.submit(fetch_url, url): url for url in urls}
for future in as_completed(futures):
url = futures[future]
try:
result = future.result()
print(f" ā
{result['url']} ā {result['status']}")
except Exception as e:
print(f" ā {url} ā {e}")
print(f"ThreadPool: {time.time() - start:.2f}s")
# ===== map() untuk kasus sederhana =====
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(fetch_url, urls))
print(f"Results: {len(results)} URLs fetched")
# ===== Timeout handling =====
with ThreadPoolExecutor(max_workers=4) as executor:
future = executor.submit(fetch_url, urls[0])
try:
result = future.result(timeout=5) # 5 detik timeout
print(f"Result: {result}")
except TimeoutError:
future.cancel()
print("Timeout!")
9. Shared Memory dan Communication
Manager Objects
import multiprocessing
from multiprocessing import Manager
import time
def worker_dict(shared_dict: dict, key: str, value: str):
"""Worker yang menulis ke shared dictionary."""
time.sleep(0.5)
shared_dict[key] = value
print(f" Set {key} = {value}")
def worker_list(shared_list: list, items: list):
"""Worker yang menulis ke shared list."""
for item in items:
shared_list.append(item)
if __name__ == "__main__":
with Manager() as manager:
# Shared dictionary
shared_dict = manager.dict()
processes = [
multiprocessing.Process(target=worker_dict, args=(shared_dict, f"key{i}", f"val{i}"))
for i in range(5)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Shared dict: {dict(shared_dict)}")
# Shared list
shared_list = manager.list()
processes = [
multiprocessing.Process(target=worker_list, args=(shared_list, [i*10, i*10+1]))
for i in range(3)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Shared list: {sorted(shared_list)}")
# Shared namespace
ns = manager.Namespace()
ns.app_name = "MyApp"
ns.version = "1.0"
print(f"Namespace: {ns.app_name} v{ns.version}")
Pipe untuk Komunikasi
import multiprocessing
from multiprocessing import Pipe
import time
def sender(conn):
"""Mengirim data melalui pipe."""
messages = ["Hello", "World", "From", "Sender"]
for msg in messages:
conn.send(msg)
print(f" Sent: {msg}")
time.sleep(0.5)
conn.send(None) # Signal selesai
conn.close()
def receiver(conn):
"""Menerima data dari pipe."""
while True:
msg = conn.recv()
if msg is None:
break
print(f" Received: {msg}")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = Pipe()
p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
p2 = multiprocessing.Process(target=receiver, args=(child_conn,))
p1.start()
p2.start()
p1.join()
p2.join()
print("Communication selesai!")
10. Asyncio Overview
asyncio adalah alternatif modern untuk threading pada I/O-bound tasks yang memerlukan skala besar (ribuan concurrent operations).
import asyncio
import time
async def fetch_data(name: str, delay: float) -> str:
"""Simulasi async I/O operation."""
print(f" š„ {name}: Mulai fetch...")
await asyncio.sleep(delay) # Non-blocking sleep
print(f" ā
{name}: Selesai!")
return f"Data dari {name}"
async def main():
# Sequential (await satu per satu)
print("=== Sequential ===")
start = time.time()
r1 = await fetch_data("API-1", 2)
r2 = await fetch_data("API-2", 3)
print(f" Time: {time.time() - start:.1f}s\n")
# Concurrent (jalankan bersamaan)
print("=== Concurrent ===")
start = time.time()
results = await asyncio.gather(
fetch_data("API-1", 2),
fetch_data("API-2", 3),
fetch_data("API-3", 1),
)
print(f" Time: {time.time() - start:.1f}s")
print(f" Results: {results}")
# Task dengan timeout
print("\n=== With Timeout ===")
try:
result = await asyncio.wait_for(
fetch_data("Slow-API", 10),
timeout=3.0
)
except TimeoutError:
print(" ā° Timeout!")
asyncio.run(main())
11. Studi Kasus
Parallel Web Scraper
"""
Parallel Web Scraper ā Menggunakan ThreadPoolExecutor
untuk mengunduh dan memproses halaman web secara concurrent.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime
@dataclass
class ScrapeResult:
url: str
status: int
content_length: int
elapsed: float
timestamp: str
def scrape_url(url: str) -> ScrapeResult:
"""Simulasi scraping satu URL."""
start = time.time()
# Simulasi network request
import random
time.sleep(random.uniform(0.5, 2.0))
status = random.choice([200, 200, 200, 404, 500])
content_length = random.randint(1000, 50000) if status == 200 else 0
return ScrapeResult(
url=url,
status=status,
content_length=content_length,
elapsed=time.time() - start,
timestamp=datetime.now().isoformat()
)
def scrape_batch(urls: List[str], max_workers: int = 10) -> List[ScrapeResult]:
"""Scrape multiple URLs secara parallel."""
results = []
errors = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(scrape_url, url): url
for url in urls
}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result(timeout=30)
results.append(result)
status_icon = "ā
" if result.status == 200 else "ā ļø"
print(f" {status_icon} {url} ā {result.status} "
f"({result.elapsed:.2f}s, {result.content_length} bytes)")
except Exception as e:
errors.append({"url": url, "error": str(e)})
print(f" ā {url} ā {e}")
return results
if __name__ == "__main__":
urls = [f"https://example.com/page/{i}" for i in range(20)]
print(f"š·ļø Scraping {len(urls)} URLs...\n")
start = time.time()
results = scrape_batch(urls, max_workers=8)
elapsed = time.time() - start
success = sum(1 for r in results if r.status == 200)
print(f"\nš Report:")
print(f" Total: {len(urls)}")
print(f" Success: {success}")
print(f" Failed: {len(urls) - success}")
print(f" Time: {elapsed:.2f}s")
print(f" Avg/URL: {elapsed/len(urls):.2f}s")
Image Processor
"""
Parallel Image Processor ā Menggunakan ProcessPoolExecutor
untuk memproses gambar secara parallel (CPU-bound).
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import time
import os
def process_image(image_path: str) -> dict:
"""Simulasi pemrosesan gambar (resize, filter, dll)."""
start = time.time()
pid = os.getpid()
# Simulasi heavy computation
data = bytearray(1_000_000) # 1MB data
for i in range(0, len(data), 100):
data[i] = min(255, data[i] + 10) # Simulasi brightness
elapsed = time.time() - start
return {
"path": image_path,
"pid": pid,
"elapsed": elapsed,
"output_size": len(data),
}
if __name__ == "__main__":
# Simulasi daftar gambar
images = [f"photos/image_{i:04d}.jpg" for i in range(50)]
# Sequential
start = time.time()
for img in images[:10]: # 10 saja untuk demo
process_image(img)
seq_time = time.time() - start
# Parallel
start = time.time()
with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
futures = {executor.submit(process_image, img): img for img in images[:10]}
for future in as_completed(futures):
result = future.result()
par_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")
print(f"Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time/par_time:.2f}x")
12. Quiz: Uji Pemahamanmu!
Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut: