Python

Python Threading & Multiprocessing

Tutorial lengkap threading dan multiprocessing di Python — thread, process, synchronization, executor pools, GIL, dan best practices untuk aplikasi concurrent dan parallel

1. Pengenalan Concurrency vs Parallelism

Dalam pemrograman modern, kemampuan untuk menjalankan beberapa tugas secara bersamaan sangat penting untuk meningkatkan performa aplikasi. Python menyediakan beberapa modul untuk menangani hal ini: threading, multiprocessing, dan asyncio.

Sebelum masuk ke kode, penting untuk memahami perbedaan antara concurrency dan parallelism.

Concurrency vs Parallelism

Aspek Concurrency Parallelism
DefinisiBanyak tugas yang bergantian dieksekusiBanyak tugas yang benar-benar dieksekusi bersamaan
CPU CoreBisa 1 core sajaMemerlukan multi-core
Cocok untukI/O-bound tasks (network, file)CPU-bound tasks (kalkulasi, processing)
Modul Pythonthreading, asynciomultiprocessing
OverheadRendahLebih tinggi (memory per process)
Diagram: Concurrency vs Parallelism
ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”
│                    CONCURRENCY                           │
│                                                          │
│  Thread 1: ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘                     │
│  Thread 2: ā–‘ā–‘ā–‘ā–‘ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–ˆā–ˆā–ˆā–ˆ                     │
│           (bergantian pada 1 core)                       │
ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤
│                    PARALLELISM                           │
│                                                          │
│  Core 1:   ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ                      │
│  Core 2:   ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ                      │
│  Core 3:   ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ                      │
│           (benar-benar bersamaan)                        │
ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜

Kapan Menggunakan Apa?

Python — Pemilihan Pendekatan
# ============================
# I/O-BOUND → gunakan threading
# ============================
# - Download file dari internet
# - Membaca/menulis file
# - Database queries
# - API calls
# - Web scraping

# ============================
# CPU-BOUND → gunakan multiprocessing
# ============================
# - Kalkulasi matematika berat
# - Image/video processing
# - Data processing besar
# - Machine learning training
# - Compression/decompression

# ============================
# Banyak I/O + skala besar → gunakan asyncio
# ============================
# - Web server
# - WebSocket connections
# - Streaming data
# - Bot dengan banyak koneksi

2. Threading Dasar

Modul threading memungkinkan kita menjalankan beberapa fungsi secara concurrent dalam satu proses. Thread berbagi memory space yang sama, sehingga komunikasi antar thread lebih mudah tetapi juga berisiko race condition.

Membuat Thread Pertama

Python — Thread Basic
import threading
import time

def download_file(name: str, duration: int):
    """Simulasi download file."""
    print(f"šŸ“„ Mulai download {name}...")
    time.sleep(duration)  # Simulasi download
    print(f"āœ… {name} selesai! ({duration} detik)")

# ===== Sequential (satu per satu) =====
print("=== Sequential ===")
start = time.time()
download_file("file_a.zip", 2)
download_file("file_b.zip", 3)
download_file("file_c.zip", 1)
print(f"Total: {time.time() - start:.1f} detik\n")
# Total: ~6 detik

# ===== Concurrent dengan Thread =====
print("=== Concurrent ===")
start = time.time()

t1 = threading.Thread(target=download_file, args=("file_a.zip", 2))
t2 = threading.Thread(target=download_file, args=("file_b.zip", 3))
t3 = threading.Thread(target=download_file, args=("file_c.zip", 1))

t1.start()
t2.start()
t3.start()

t1.join()  # Tunggu t1 selesai
t2.join()  # Tunggu t2 selesai
t3.join()  # Tunggu t3 selesai

print(f"Total: {time.time() - start:.1f} detik")
# Total: ~3 detik (waktu thread terlama)

Thread dengan kwargs dan daemon

Python — Thread Options
import threading
import time

def worker(task_id: int, delay: float):
    """Worker thread."""
    print(f"  Task {task_id} mulai (thread: {threading.current_thread().name})")
    time.sleep(delay)
    print(f"  Task {task_id} selesai")

# Menggunakan kwargs
t = threading.Thread(
    target=worker,
    args=(1,),
    kwargs={"delay": 2.0},
    name="MyWorker",
    daemon=False  # Default: non-daemon
)

t.start()
print(f"Thread name: {t.name}")
print(f"Thread alive: {t.is_alive()}")
print(f"Thread daemon: {t.daemon}")

t.join()

# Daemon thread: berhenti ketika program utama berhenti
def background_logger():
    while True:
        print(f"  [Logger] Heartbeat — {time.ctime()}")
        time.sleep(5)

daemon = threading.Thread(target=background_logger, daemon=True)
daemon.start()

print("Main program berjalan...")
time.sleep(12)
print("Main program selesai — daemon akan berhenti otomatis")

# Multiple threads dengan loop
threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i, 1.5))
    threads.append(t)
    t.start()

# Tunggu semua selesai
for t in threads:
    t.join()

print("Semua task selesai!")

3. Thread Class dan Custom Thread

Cara terbaik untuk membuat thread adalah dengan meng-subclass threading.Thread dan override method run().

Python — Custom Thread Class
import threading
import time
from typing import List

class DownloadThread(threading.Thread):
    """Thread untuk simulasi download."""

    def __init__(self, url: str, filename: str):
        super().__init__(name=f"Download-{filename}")
        self.url = url
        self.filename = filename
        self.result = None
        self.error = None

    def run(self):
        """Method yang dijalankan saat thread.start() dipanggil."""
        try:
            print(f"šŸ“„ [{self.name}] Download dari {self.url}")
            time.sleep(2)  # Simulasi download
            self.result = f"Data dari {self.url}"
            print(f"āœ… [{self.name}] Selesai: {self.filename}")
        except Exception as e:
            self.error = e
            print(f"āŒ [{self.name}] Error: {e}")

class ProcessingThread(threading.Thread):
    """Thread untuk memproses data."""

    def __init__(self, data: list, result_list: list, index: int):
        super().__init__(name=f"Process-{index}")
        self.data = data
        self.result_list = result_list
        self.index = index

    def run(self):
        # Simulasi processing
        result = sum(x ** 2 for x in self.data)
        self.result_list[self.index] = result
        print(f"  [{self.name}] Result: {result}")

# Menggunakan custom thread
print("=== Custom Thread ===")
urls = [
    ("https://api.example.com/data1", "data1.json"),
    ("https://api.example.com/data2", "data2.json"),
    ("https://api.example.com/data3", "data3.json"),
]

threads = []
for url, filename in urls:
    t = DownloadThread(url, filename)
    threads.append(t)
    t.start()

for t in threads:
    t.join()
    if t.result:
        print(f"  Hasil: {t.result}")

# Processing paralel dengan shared result
print("\n=== Parallel Processing ===")
data_chunks = [
    list(range(1, 10001)),
    list(range(10001, 20001)),
    list(range(20001, 30001)),
]
results = [None] * len(data_chunks)

workers = []
for i, chunk in enumerate(data_chunks):
    t = ProcessingThread(chunk, results, i)
    workers.append(t)
    t.start()

for t in workers:
    t.join()

total = sum(results)
print(f"Total: {total:,}")

Thread-local Data

Python — Thread Local
import threading

# Thread-local storage: data khusus per thread
local_data = threading.local()

def process_request(request_id: int):
    # Set data khusus untuk thread ini
    local_data.request_id = request_id
    local_data.start_time = time.time()

    # Semua fungsi yang dipanggil dari thread ini bisa akses local_data
    do_work()

def do_work():
    # Akses data thread-local
    rid = local_data.request_id
    elapsed = time.time() - local_data.start_time
    print(f"  Request {rid} diproses oleh {threading.current_thread().name} "
          f"({elapsed:.3f}s)")

import time

# Jalankan beberapa request di thread berbeda
threads = []
for i in range(5):
    t = threading.Thread(target=process_request, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

4. Synchronization Primitives

Ketika beberapa thread mengakses resource yang sama, kita perlu mekanisme sinkronisasi untuk mencegah race condition.

Lock (Mutex)

Python — Lock
import threading
import time

# āŒ TANPA LOCK: Race Condition
counter_unsafe = 0

def increment_unsafe(n: int):
    global counter_unsafe
    for _ in range(n):
        counter_unsafe += 1  # Tidak aman!

threads = []
for _ in range(10):
    t = threading.Thread(target=increment_unsafe, args=(100_000,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

print(f"Tanpa lock: {counter_unsafe}")  # Bisa kurang dari 1_000_000!

# āœ… DENGAN LOCK: Thread-safe
counter_safe = 0
lock = threading.Lock()

def increment_safe(n: int):
    global counter_safe
    for _ in range(n):
        with lock:  # Lock otomatis di-release saat keluar
            counter_safe += 1

threads = []
for _ in range(10):
    t = threading.Thread(target=increment_safe, args=(100_000,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

print(f"Dengan lock: {counter_safe}")  # Selalu 1_000_000

# Lock dengan try/finally
lock2 = threading.Lock()

def safe_operation():
    lock2.acquire()
    try:
        # Kode kritis
        print("  Melakukan operasi yang dilindungi...")
    finally:
        lock2.release()  # Pastikan selalu di-release

# RLock: Reentrant Lock (bisa di-lock berkali-kali dari thread yang sama)
rlock = threading.RLock()

def recursive_lock_example(depth: int):
    with rlock:
        if depth > 0:
            print(f"  Depth: {depth}")
            recursive_lock_example(depth - 1)

recursive_lock_example(5)

Event dan Condition

Python — Event & Condition
import threading
import time

# ===== Event: sinyal antar thread =====
ready_event = threading.Event()

def producer():
    print("šŸ­ Producer: Menyiapkan data...")
    time.sleep(3)  # Simulasi persiapan
    print("šŸ­ Producer: Data siap!")
    ready_event.set()  # Sinyal bahwa data siap

def consumer(name: str):
    print(f"  {name}: Menunggu data...")
    ready_event.wait()  # Block sampai event di-set
    print(f"  {name}: Data diterima, mulai proses!")

# Jalankan
t_producer = threading.Thread(target=producer)
t_consumers = [threading.Thread(target=consumer, args=(f"Consumer-{i}",)) for i in range(3)]

t_producer.start()
for t in t_consumers:
    t.start()

t_producer.join()
for t in t_consumers:
    t.join()

# ===== Condition: wait/notify =====
buffer = []
condition = threading.Condition()

def producer_cond():
    for i in range(5):
        time.sleep(1)
        with condition:
            buffer.append(i)
            print(f"  Produced: {i}")
            condition.notify()  # Notify waiting consumer

def consumer_cond():
    while True:
        with condition:
            while not buffer:
                condition.wait()  # Wait for notification
            item = buffer.pop(0)
            print(f"  Consumed: {item}")
            if item == 4:
                return

t1 = threading.Thread(target=producer_cond)
t2 = threading.Thread(target=consumer_cond)
t1.start()
t2.start()
t1.join()
t2.join()

Semaphore dan Barrier

Python — Semaphore & Barrier
import threading
import time

# ===== Semaphore: Membatasi jumlah concurrent access =====
# Contoh: Database pool dengan max 3 koneksi
db_semaphore = threading.Semaphore(3)  # Max 3 concurrent

def db_query(query_id: int):
    print(f"  Query {query_id} menunggu koneksi...")
    with db_semaphore:
        print(f"  Query {query_id} mendapat koneksi")
        time.sleep(2)  # Simulasi query
        print(f"  Query {query_id} selesai")

threads = [threading.Thread(target=db_query, args=(i,)) for i in range(8)]
for t in threads:
    t.start()
for t in threads:
    t.join()

# ===== Barrier: Semua thread menunggu sampai semua siap =====
barrier = threading.Barrier(3)  # 3 thread harus kumpul

def worker_barrier(worker_id: int):
    print(f"  Worker {worker_id}: Menyiapkan...")
    time.sleep(worker_id)  # Waktu persiapan berbeda
    print(f"  Worker {worker_id}: Siap, menunggu yang lain...")
    barrier.wait()  # Block sampai semua 3 thread siap
    print(f"  Worker {worker_id}: Mulai eksekusi bersama!")

threads = [threading.Thread(target=worker_barrier, args=(i,)) for i in range(3)]
for t in threads:
    t.start()
for t in threads:
    t.join()

5. Memahami GIL (Global Interpreter Lock)

GIL adalah mekanisme di CPython yang memastikan hanya satu thread yang mengeksekusi bytecode Python pada satu waktu. Ini adalah alasan utama mengapa threading tidak efektif untuk CPU-bound tasks.

Dampak GIL

Python — GIL Demo
import threading
import time
import multiprocessing

def cpu_bound(n: int) -> int:
    """CPU-intensive task."""
    total = 0
    for i in range(n):
        total += i * i
    return total

# === Test 1: Sequential ===
start = time.time()
r1 = cpu_bound(10_000_000)
r2 = cpu_bound(10_000_000)
seq_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s")

# === Test 2: Threading (CPU-bound) ===
start = time.time()
t1 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t2 = threading.Thread(target=cpu_bound, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
thread_time = time.time() - start
print(f"Threading:  {thread_time:.2f}s")  # ~sama dengan sequential!

# === Test 3: Multiprocessing (CPU-bound) ===
start = time.time()
p1 = multiprocessing.Process(target=cpu_bound, args=(10_000_000,))
p2 = multiprocessing.Process(target=cpu_bound, args=(10_000_000,))
p1.start(); p2.start()
p1.join(); p2.join()
mp_time = time.time() - start
print(f"Multiproc:  {mp_time:.2f}s")  # ~50% lebih cepat!

# Hasil di multi-core machine:
# Sequential: 6.50s
# Threading:  6.48s  ← GIL mencegah parallelism!
# Multiproc:  3.82s  ← Benar-benar parallel
āš ļø GIL: Kapan Berpengaruh?

CPU-bound: GIL membuat threading hampir tidak berguna. Gunakan multiprocessing.
I/O-bound: GIL di-release saat I/O operations, jadi threading tetap efektif.
Python 3.13+: Experimental free-threaded build (no-GIL) tersedia sebagai opt-in.

6. Multiprocessing Dasar

Modul multiprocessing menggunakan process terpisah, masing-masing dengan Python interpreter sendiri, sehingga bisa benar-benar parallel.

Python — Multiprocessing Basics
import multiprocessing
import time
import os

def worker(task_id: int):
    """Worker yang dijalankan di process terpisah."""
    pid = os.getpid()
    print(f"  Task {task_id} di PID {pid}")
    time.sleep(1)
    return task_id * task_id

if __name__ == "__main__":
    print(f"Main process PID: {os.getpid()}")

    # Membuat process
    processes = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("Semua process selesai!")

    # Menggunakan return value dengan Queue
    from multiprocessing import Queue

    def worker_with_queue(task_id: int, queue: Queue):
        result = task_id ** 2
        queue.put((task_id, result))

    queue = Queue()
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker_with_queue, args=(i, queue))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    # Kumpulkan hasil
    results = {}
    while not queue.empty():
        task_id, result = queue.get()
        results[task_id] = result

    print(f"Results: {results}")

Menggunakan Value dan Array

Python — Shared Memory
import multiprocessing
from multiprocessing import Value, Array
import ctypes

def increment_counter(counter: Value, lock: multiprocessing.Lock, n: int):
    for _ in range(n):
        with lock:
            counter.value += 1

def process_array(arr: Array, start: int, end: int):
    for i in range(start, end):
        arr[i] = i * i

if __name__ == "__main__":
    # Shared counter
    counter = Value('i', 0)  # 'i' = integer
    lock = multiprocessing.Lock()

    processes = []
    for _ in range(4):
        p = multiprocessing.Process(
            target=increment_counter,
            args=(counter, lock, 250_000)
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Counter: {counter.value}")  # 1_000_000

    # Shared array
    arr = Array('i', 10)  # Array of 10 integers
    processes = []
    chunk_size = 5
    for i in range(2):
        p = multiprocessing.Process(
            target=process_array,
            args=(arr, i * chunk_size, (i + 1) * chunk_size)
        )
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(f"Array: {list(arr)}")  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

7. Process Pool dan Executor

ProcessPoolExecutor adalah cara modern dan recommended untuk menggunakan multiprocessing. Pool mengelola kumpulan worker process dan mendistribusikan tugas secara otomatis.

Python — ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math

def is_prime(n: int) -> bool:
    """Mengecek apakah bilangan prima."""
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False
    for i in range(3, int(math.sqrt(n)) + 1, 2):
        if n % i == 0:
            return False
    return True

def count_primes_in_range(start: int, end: int) -> int:
    """Menghitung jumlah prima dalam range."""
    return sum(1 for n in range(start, end) if is_prime(n))

if __name__ == "__main__":
    numbers = list(range(1, 1_000_001))

    # Sequential
    start = time.time()
    primes_seq = count_primes_in_range(1, 1_000_001)
    seq_time = time.time() - start
    print(f"Sequential: {primes_seq} primes in {seq_time:.2f}s")

    # ProcessPoolExecutor
    start = time.time()
    chunk_size = 100_000
    ranges = [
        (i, min(i + chunk_size, 1_000_001))
        for i in range(1, 1_000_001, chunk_size)
    ]

    with ProcessPoolExecutor(max_workers=4) as executor:
        futures = {
            executor.submit(count_primes_in_range, s, e): (s, e)
            for s, e in ranges
        }

        primes_par = 0
        for future in as_completed(futures):
            primes_par += future.result()

    par_time = time.time() - start
    print(f"Parallel:   {primes_par} primes in {par_time:.2f}s")
    print(f"Speedup:    {seq_time / par_time:.2f}x")

Pool.map dan Pool.starmap

Python — Pool Methods
import multiprocessing
import time

def process_item(item):
    """Simulasi processing satu item."""
    time.sleep(0.5)
    return item ** 2

def process_pair(a, b):
    """Function dengan multiple arguments."""
    time.sleep(0.3)
    return a + b

if __name__ == "__main__":
    items = list(range(20))

    # pool.map — sederhana dan mudah
    with multiprocessing.Pool(4) as pool:
        results = pool.map(process_item, items)
    print(f"map results: {results}")

    # pool.map dengan chunksize
    with multiprocessing.Pool(4) as pool:
        results = pool.map(process_item, items, chunksize=5)
    print(f"map chunked: {results}")

    # pool.starmap — untuk function dengan multiple args
    pairs = [(1, 2), (3, 4), (5, 6), (7, 8)]
    with multiprocessing.Pool(4) as pool:
        results = pool.starmap(process_pair, pairs)
    print(f"starmap: {results}")

    # pool.imap — lazy iterator (hemat memory)
    with multiprocessing.Pool(4) as pool:
        for result in pool.imap(process_item, items):
            print(f"  Got: {result}")

    # pool.apply_async — non-blocking
    with multiprocessing.Pool(4) as pool:
        async_results = [pool.apply_async(process_item, (x,)) for x in items]
        results = [r.get(timeout=10) for r in async_results]
    print(f"async: {results}")

8. Thread Pool Executor

ThreadPoolExecutor cocok untuk I/O-bound tasks seperti HTTP requests, file operations, dan database queries.

Python — ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import urllib.request
from typing import List, Dict

def fetch_url(url: str) -> Dict:
    """Simulasi fetch URL."""
    time.sleep(1)  # Simulasi network delay
    return {"url": url, "status": 200, "size": len(url) * 1000}

if __name__ == "__main__":
    urls = [
        "https://api.example.com/users",
        "https://api.example.com/posts",
        "https://api.example.com/comments",
        "https://api.example.com/albums",
        "https://api.example.com/photos",
        "https://api.example.com/todos",
    ]

    # ===== Sequential =====
    start = time.time()
    results_seq = [fetch_url(url) for url in urls]
    print(f"Sequential: {time.time() - start:.2f}s")

    # ===== ThreadPoolExecutor =====
    start = time.time()
    with ThreadPoolExecutor(max_workers=6) as executor:
        futures = {executor.submit(fetch_url, url): url for url in urls}

        for future in as_completed(futures):
            url = futures[future]
            try:
                result = future.result()
                print(f"  āœ… {result['url']} — {result['status']}")
            except Exception as e:
                print(f"  āŒ {url} — {e}")

    print(f"ThreadPool: {time.time() - start:.2f}s")

    # ===== map() untuk kasus sederhana =====
    with ThreadPoolExecutor(max_workers=3) as executor:
        results = list(executor.map(fetch_url, urls))
    print(f"Results: {len(results)} URLs fetched")

    # ===== Timeout handling =====
    with ThreadPoolExecutor(max_workers=4) as executor:
        future = executor.submit(fetch_url, urls[0])
        try:
            result = future.result(timeout=5)  # 5 detik timeout
            print(f"Result: {result}")
        except TimeoutError:
            future.cancel()
            print("Timeout!")

9. Shared Memory dan Communication

Manager Objects

Python — Manager
import multiprocessing
from multiprocessing import Manager
import time

def worker_dict(shared_dict: dict, key: str, value: str):
    """Worker yang menulis ke shared dictionary."""
    time.sleep(0.5)
    shared_dict[key] = value
    print(f"  Set {key} = {value}")

def worker_list(shared_list: list, items: list):
    """Worker yang menulis ke shared list."""
    for item in items:
        shared_list.append(item)

if __name__ == "__main__":
    with Manager() as manager:
        # Shared dictionary
        shared_dict = manager.dict()
        processes = [
            multiprocessing.Process(target=worker_dict, args=(shared_dict, f"key{i}", f"val{i}"))
            for i in range(5)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"Shared dict: {dict(shared_dict)}")

        # Shared list
        shared_list = manager.list()
        processes = [
            multiprocessing.Process(target=worker_list, args=(shared_list, [i*10, i*10+1]))
            for i in range(3)
        ]

        for p in processes:
            p.start()
        for p in processes:
            p.join()

        print(f"Shared list: {sorted(shared_list)}")

        # Shared namespace
        ns = manager.Namespace()
        ns.app_name = "MyApp"
        ns.version = "1.0"
        print(f"Namespace: {ns.app_name} v{ns.version}")

Pipe untuk Komunikasi

Python — Pipe
import multiprocessing
from multiprocessing import Pipe
import time

def sender(conn):
    """Mengirim data melalui pipe."""
    messages = ["Hello", "World", "From", "Sender"]
    for msg in messages:
        conn.send(msg)
        print(f"  Sent: {msg}")
        time.sleep(0.5)
    conn.send(None)  # Signal selesai
    conn.close()

def receiver(conn):
    """Menerima data dari pipe."""
    while True:
        msg = conn.recv()
        if msg is None:
            break
        print(f"  Received: {msg}")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = Pipe()

    p1 = multiprocessing.Process(target=sender, args=(parent_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(child_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print("Communication selesai!")

10. Asyncio Overview

asyncio adalah alternatif modern untuk threading pada I/O-bound tasks yang memerlukan skala besar (ribuan concurrent operations).

Python — Asyncio Basics
import asyncio
import time

async def fetch_data(name: str, delay: float) -> str:
    """Simulasi async I/O operation."""
    print(f"  šŸ“„ {name}: Mulai fetch...")
    await asyncio.sleep(delay)  # Non-blocking sleep
    print(f"  āœ… {name}: Selesai!")
    return f"Data dari {name}"

async def main():
    # Sequential (await satu per satu)
    print("=== Sequential ===")
    start = time.time()
    r1 = await fetch_data("API-1", 2)
    r2 = await fetch_data("API-2", 3)
    print(f"  Time: {time.time() - start:.1f}s\n")

    # Concurrent (jalankan bersamaan)
    print("=== Concurrent ===")
    start = time.time()
    results = await asyncio.gather(
        fetch_data("API-1", 2),
        fetch_data("API-2", 3),
        fetch_data("API-3", 1),
    )
    print(f"  Time: {time.time() - start:.1f}s")
    print(f"  Results: {results}")

    # Task dengan timeout
    print("\n=== With Timeout ===")
    try:
        result = await asyncio.wait_for(
            fetch_data("Slow-API", 10),
            timeout=3.0
        )
    except TimeoutError:
        print("  ā° Timeout!")

asyncio.run(main())

11. Studi Kasus

Parallel Web Scraper

Python — Web Scraper
"""
Parallel Web Scraper — Menggunakan ThreadPoolExecutor
untuk mengunduh dan memproses halaman web secara concurrent.
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import json
from typing import List, Dict
from dataclasses import dataclass
from datetime import datetime

@dataclass
class ScrapeResult:
    url: str
    status: int
    content_length: int
    elapsed: float
    timestamp: str

def scrape_url(url: str) -> ScrapeResult:
    """Simulasi scraping satu URL."""
    start = time.time()

    # Simulasi network request
    import random
    time.sleep(random.uniform(0.5, 2.0))

    status = random.choice([200, 200, 200, 404, 500])
    content_length = random.randint(1000, 50000) if status == 200 else 0

    return ScrapeResult(
        url=url,
        status=status,
        content_length=content_length,
        elapsed=time.time() - start,
        timestamp=datetime.now().isoformat()
    )

def scrape_batch(urls: List[str], max_workers: int = 10) -> List[ScrapeResult]:
    """Scrape multiple URLs secara parallel."""
    results = []
    errors = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {
            executor.submit(scrape_url, url): url
            for url in urls
        }

        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result(timeout=30)
                results.append(result)
                status_icon = "āœ…" if result.status == 200 else "āš ļø"
                print(f"  {status_icon} {url} — {result.status} "
                      f"({result.elapsed:.2f}s, {result.content_length} bytes)")
            except Exception as e:
                errors.append({"url": url, "error": str(e)})
                print(f"  āŒ {url} — {e}")

    return results

if __name__ == "__main__":
    urls = [f"https://example.com/page/{i}" for i in range(20)]

    print(f"šŸ•·ļø Scraping {len(urls)} URLs...\n")
    start = time.time()

    results = scrape_batch(urls, max_workers=8)

    elapsed = time.time() - start
    success = sum(1 for r in results if r.status == 200)

    print(f"\nšŸ“Š Report:")
    print(f"  Total:     {len(urls)}")
    print(f"  Success:   {success}")
    print(f"  Failed:    {len(urls) - success}")
    print(f"  Time:      {elapsed:.2f}s")
    print(f"  Avg/URL:   {elapsed/len(urls):.2f}s")

Image Processor

Python — Image Processor
"""
Parallel Image Processor — Menggunakan ProcessPoolExecutor
untuk memproses gambar secara parallel (CPU-bound).
"""
from concurrent.futures import ProcessPoolExecutor, as_completed
from pathlib import Path
import time
import os

def process_image(image_path: str) -> dict:
    """Simulasi pemrosesan gambar (resize, filter, dll)."""
    start = time.time()
    pid = os.getpid()

    # Simulasi heavy computation
    data = bytearray(1_000_000)  # 1MB data
    for i in range(0, len(data), 100):
        data[i] = min(255, data[i] + 10)  # Simulasi brightness

    elapsed = time.time() - start
    return {
        "path": image_path,
        "pid": pid,
        "elapsed": elapsed,
        "output_size": len(data),
    }

if __name__ == "__main__":
    # Simulasi daftar gambar
    images = [f"photos/image_{i:04d}.jpg" for i in range(50)]

    # Sequential
    start = time.time()
    for img in images[:10]:  # 10 saja untuk demo
        process_image(img)
    seq_time = time.time() - start

    # Parallel
    start = time.time()
    with ProcessPoolExecutor(max_workers=os.cpu_count()) as executor:
        futures = {executor.submit(process_image, img): img for img in images[:10]}
        for future in as_completed(futures):
            result = future.result()
    par_time = time.time() - start

    print(f"Sequential: {seq_time:.2f}s")
    print(f"Parallel:   {par_time:.2f}s")
    print(f"Speedup:    {seq_time/par_time:.2f}x")

12. Quiz: Uji Pemahamanmu!

Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut:

Pertanyaan 1: Untuk tugas CPU-bound, modul mana yang sebaiknya digunakan?

a) threading
b) multiprocessing
c) asyncio
d) subprocess

Pertanyaan 2: Apa itu GIL dalam Python?

a) Library untuk GPU computing
b) Mekanisme yang memastikan hanya satu thread yang mengeksekusi bytecode pada satu waktu
c) Fungsi untuk garbage collection
d) Protokol networking

Pertanyaan 3: Method apa yang digunakan untuk menunggu thread selesai?

a) thread.wait()
b) thread.stop()
c) thread.join()
d) thread.finish()

Pertanyaan 4: Apa fungsi dari threading.Lock()?

a) Mengunci file dari akses thread lain
b) Mencegah race condition dengan membatasi akses ke resource bersama
c) Menghentikan thread yang berjalan terlalu lama
d) Mengenkripsi data antar thread

Pertanyaan 5: ThreadPoolExecutor cocok untuk jenis tugas apa?

a) CPU-bound tasks
b) I/O-bound tasks
c) GPU computing
d) Semua jenis tugas