1. Pengenalan Multiprocessing
Python memiliki modul multiprocessing bawaan yang memungkinkan Anda menjalankan beberapa proses secara paralel. Berbeda dengan threading yang terbatas oleh GIL (Global Interpreter Lock), multiprocessing benar-benar menjalankan kode secara paralel di multiple CPU cores.
Apa Itu Multiprocessing?
Multiprocessing adalah teknik menjalankan beberapa proses secara bersamaan, masing-masing dengan interpreter Python dan memory space sendiri. Ini sangat ideal untuk tugas yang CPU-bound (komputasi intensif) seperti pemrosesan data, perhitungan matematika, dan image processing.
┌────────────────────────────────────────────────────────────┐ │ SINGLE PROCESS (Sequential) │ │ │ │ CPU Core 1: [Task1]→[Task2]→[Task3]→[Task4] │ │ CPU Core 2: [idle] │ │ CPU Core 3: [idle] │ │ CPU Core 4: [idle] │ │ │ │ Waktu: ████████████████████████ │ ├────────────────────────────────────────────────────────────┤ │ MULTIPROCESSING (Parallel) │ │ │ │ CPU Core 1: [Task1]──────┐ │ │ CPU Core 2: [Task2]──────┤ │ │ CPU Core 3: [Task3]──────┤ │ │ CPU Core 4: [Task4]──────┘ │ │ │ │ Waktu: ████████ │ └────────────────────────────────────────────────────────────┘
Kapan Menggunakan Multiprocessing?
| Jenis Tugas | Contoh | Solusi Terbaik |
|---|---|---|
| CPU-bound | Kalkulasi matematika, encoding video | multiprocessing |
| I/O-bound | Download file, database query | threading atau asyncio |
| Campuran | Web scraping + parsing | Kombinasi |
2. Threading vs Multiprocessing
Sebelum lanjut, penting untuk membedakan antara threading dan multiprocessing.
Global Interpreter Lock (GIL)
GIL adalah mekanisme di CPython yang memastikan hanya satu thread yang menjalankan bytecode Python pada satu waktu. Ini berarti threading tidak bisa memanfaatkan multiple CPU cores untuk tugas CPU-bound.
| Aspek | Threading | Multiprocessing |
|---|---|---|
| Memory | Berbagi memory | Memory terpisah per proses |
| GIL | Terpengaruh GIL | Tidak terpengaruh GIL |
| CPU-bound | ❌ Tidak efektif | ✅ Sangat efektif |
| I/O-bound | ✅ Efektif | Overhead lebih besar |
| Overhead | Rendah | Tinggi (start proses baru) |
| Komunikasi | Mudah (shared memory) | Perlu IPC (Queue, Pipe) |
| Crash | Thread crash → semua crash | Proses crash → proses lain aman |
import time
import threading
import multiprocessing
def hitung_berat(n):
"""Simulasi tugas CPU-bound."""
total = 0
for i in range(n):
total += i * i
return total
# Benchmark: Sequential
start = time.time()
hitung_berat(10_000_000)
hitung_berat(10_000_000)
sequential_time = time.time() - start
print(f"Sequential: {sequential_time:.2f}s")
# Benchmark: Threading (tidak membantu untuk CPU-bound!)
start = time.time()
t1 = threading.Thread(target=hitung_berat, args=(10_000_000,))
t2 = threading.Thread(target=hitung_berat, args=(10_000_000,))
t1.start(); t2.start()
t1.join(); t2.join()
threading_time = time.time() - start
print(f"Threading: {threading_time:.2f}s") # Hampir sama dengan sequential!
# Benchmark: Multiprocessing (benar-benar paralel!)
start = time.time()
p1 = multiprocessing.Process(target=hitung_berat, args=(10_000_000,))
p2 = multiprocessing.Process(target=hitung_berat, args=(10_000_000,))
p1.start(); p2.start()
p1.join(); p2.join()
mp_time = time.time() - start
print(f"Multiprocessing: {mp_time:.2f}s") # ~2x lebih cepat!
3. Membuat Proses Dasar
Membuat Process dengan class Process
import multiprocessing
import os
import time
def cetak_nama(nama, delay):
"""Fungsi yang dijalankan di proses terpisah."""
pid = os.getpid()
print(f"[PID {pid}] Mulai: {nama}")
time.sleep(delay)
print(f"[PID {pid}] Selesai: {nama} ({delay}s)")
if __name__ == '__main__':
# Membuat proses
p1 = multiprocessing.Process(target=cetak_nama, args=("Proses A", 2))
p2 = multiprocessing.Process(target=cetak_nama, args=("Proses B", 1))
print(f"[PID {os.getpid()}] Main process mulai")
# Menjalankan proses
p1.start()
p2.start()
print(f"p1 alive: {p1.is_alive()}") # True
print(f"p1 PID: {p1.pid}")
# Menunggu proses selesai
p1.join()
p2.join()
print(f"p1 exit code: {p1.exitcode}") # 0 = sukses
print(f"[PID {os.getpid()}] Main process selesai")
# Output:
# [PID 1234] Main process mulai
# p1 alive: True
# p1 PID: 1235
# [PID 1235] Mulai: Proses A
# [PID 1236] Mulai: Proses B
# [PID 1236] Selesai: Proses B (1s)
# [PID 1235] Selesai: Proses A (2s)
# [PID 1234] Main process selesai
Process dengan Subclass
import multiprocessing
import os
class WorkerProses(multiprocessing.Process):
"""Custom process class."""
def __init__(self, nama, tugas):
super().__init__()
self.nama = nama
self.tugas = tugas
def run(self):
"""Metode yang dijalankan saat start() dipanggil."""
pid = os.getpid()
print(f"[{self.nama}] PID: {pid} mulai mengerjakan: {self.tugas}")
# Simulasi kerja
hasil = sum(range(self.tugas))
print(f"[{self.nama}] Selesai! Hasil: {hasil}")
if __name__ == '__main__':
workers = [
WorkerProses("Worker-A", 1_000_000),
WorkerProses("Worker-B", 2_000_000),
WorkerProses("Worker-C", 3_000_000),
]
for w in workers:
w.start()
for w in workers:
w.join()
print("Semua worker selesai!")
4. Process Pool
Pool memungkinkan Anda menjalankan fungsi secara paralel dengan jumlah worker yang terkontrol. Pool secara otomatis mendistribusikan tugas ke proses-proses yang tersedia.
import multiprocessing
import time
import os
def hitung_kuadrat(n):
"""Hitung kuadrat dan tidur sebentar."""
pid = os.getpid()
result = n * n
time.sleep(0.5)
return f"PID {pid}: {n}² = {result}"
if __name__ == '__main__':
data = list(range(1, 11))
# --- Pool.map() ---
# Mendistribusikan data ke pool workers
with multiprocessing.Pool(processes=4) as pool:
hasil = pool.map(hitung_kuadrat, data)
for h in hasil:
print(h)
# --- Pool.apply() — satu tugas sekaligus ---
with multiprocessing.Pool(processes=4) as pool:
result = pool.apply(hitung_kuadrat, args=(42,))
print(f"apply: {result}")
# --- Pool.map_async() — non-blocking ---
with multiprocessing.Pool(processes=4) as pool:
async_result = pool.map_async(hitung_kuadrat, data)
# Bisa kerja lain sambil menunggu...
print("Menunggu hasil async...")
hasil = async_result.get() # Blocking sampai selesai
for h in hasil:
print(h)
# --- Pool.starmap() — multiple arguments ---
def proses_data(nama, angka):
return f"{nama}: {angka * 2}"
with multiprocessing.Pool(processes=2) as pool:
args_list = [("Data-A", 10), ("Data-B", 20), ("Data-C", 30)]
hasil = pool.starmap(proses_data, args_list)
print(hasil) # ['Data-A: 20', 'Data-B: 40', 'Data-C: 60']
Pool: map vs imap vs apply vs starmap
| Metode | Blocking | Return | Urutan | Multi Args |
|---|---|---|---|---|
map() | Ya | List | Terurut | Tidak |
map_async() | Tidak | AsyncResult | Terurut | Tidak |
imap() | Ya (lazy) | Iterator | Terurut | Tidak |
imap_unordered() | Ya (lazy) | Iterator | Tidak | Tidak |
apply() | Ya | Result | - | Tidak |
starmap() | Ya | List | Terurut | Ya |
Menentukan Jumlah Proses Optimal
import multiprocessing
# Jumlah CPU cores yang tersedia
cpu_count = multiprocessing.cpu_count()
print(f"CPU cores: {cpu_count}") # Misalnya: 8
# Best practice: gunakan jumlah cores yang tersedia
# atau sedikit lebih banyak untuk I/O mixing
with multiprocessing.Pool(processes=cpu_count) as pool:
hasil = pool.map(hitung_kuadrat, range(100))
# Atau dengan initializer untuk setup per-worker
def init_worker():
"""Dijalankan sekali per worker saat pool dimulai."""
import os
print(f"Worker PID {os.getpid()} siap")
with multiprocessing.Pool(processes=4, initializer=init_worker) as pool:
hasil = pool.map(hitung_kuadrat, range(10))
5. Queue untuk Komunikasi
Karena setiap proses memiliki memory space terpisah, mereka tidak bisa berbagi variabel secara langsung. Queue menyediakan mekanisme komunikasi yang aman antar proses menggunakan pola FIFO (First In, First Out).
import multiprocessing
import time
import random
def producer(queue, nama):
"""Menghasilkan data dan memasukkannya ke queue."""
for i in range(5):
item = f"{nama}-item-{i}"
time.sleep(random.uniform(0.1, 0.5))
queue.put(item)
print(f"[{nama}] Memproduksi: {item}")
queue.put(None) # Signal selesai
print(f"[{nama}] Selesai memproduksi")
def consumer(queue, nama):
"""Mengambil data dari queue dan memprosesnya."""
while True:
item = queue.get()
if item is None: # Signal selesai
print(f"[{nama}] Menerima sinyal selesai")
break
print(f"[{nama}] Mengonsumsi: {item}")
time.sleep(0.2)
if __name__ == '__main__':
queue = multiprocessing.Queue()
# Buat 2 producer dan 1 consumer
p1 = multiprocessing.Process(target=producer, args=(queue, "Producer-1"))
p2 = multiprocessing.Process(target=producer, args=(queue, "Producer-2"))
c1 = multiprocessing.Process(target=consumer, args=(queue, "Consumer"))
p1.start()
p2.start()
c1.start()
p1.join()
p2.join()
# Tambahkan sinyal selesai kedua (untuk kedua producer)
queue.put(None)
c1.join()
print("Semua proses selesai!")
Queue dengan Limit Ukuran
import multiprocessing
import time
def producer_bounded(queue):
"""Queue dengan maxsize=3, akan block jika penuh."""
for i in range(10):
print(f" Memasukkan item-{i}...")
queue.put(f"item-{i}") # Block jika queue penuh
print(f" item-{i} berhasil dimasukkan")
queue.put("DONE")
def consumer_bounded(queue):
"""Consumer yang lambat → producer akan block."""
while True:
time.sleep(1) # Consumer lambat
item = queue.get()
if item == "DONE":
break
print(f" Mengambil: {item}")
if __name__ == '__main__':
# Queue dengan max 3 item
q = multiprocessing.Queue(maxsize=3)
p1 = multiprocessing.Process(target=producer_bounded, args=(q,))
c1 = multiprocessing.Process(target=consumer_bounded, args=(q,))
p1.start()
c1.start()
p1.join()
c1.join()
6. Pipe untuk Komunikasi
Pipe menyediakan channel komunikasi dua arah antara dua proses. Lebih cepat dari Queue tetapi hanya untuk komunikasi 1-to-1.
import multiprocessing
import os
def proses_anak(conn):
"""Proses anak mengirim data ke parent."""
pid = os.getpid()
conn.send(f"Hello dari anak (PID {pid})!")
conn.send({"data": [1, 2, 3], "status": "ok"})
# Menerima balasan dari parent
balasan = conn.recv()
print(f"[Anak] Menerima balasan: {balasan}")
conn.close()
if __name__ == '__main__':
# Pipe: (parent_conn, child_conn)
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=proses_anak, args=(child_conn,))
p.start()
# Parent tidak butuh child_conn
child_conn.close()
# Menerima data dari anak
pesan1 = parent_conn.recv()
print(f"[Parent] Menerima: {pesan1}")
pesan2 = parent_conn.recv()
print(f"[Parent] Menerima: {pesan2}")
# Kirim balasan
parent_conn.send("Terima kasih, Anak!")
p.join()
parent_conn.close()
7. Shared Memory
Untuk berbagi data antar proses secara efisien, Python menyediakan shared memory yang memungkinkan beberapa proses mengakses data yang sama di memory.
Value dan Array
import multiprocessing
import ctypes
def tambah_counter(shared_counter, lock, n):
"""Increment shared counter n kali."""
for _ in range(n):
with lock: # Gunakan lock untuk keamanan
shared_counter.value += 1
def proses_array(shared_array, lock, start_idx, values):
"""Tulis ke shared array."""
with lock:
for i, val in enumerate(values):
shared_array[start_idx + i] = val
if __name__ == '__main__':
# Shared integer dengan tipe 'i' (signed int)
counter = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
# Shared array dengan tipe 'd' (double)
shared_arr = multiprocessing.Array('d', 10)
# 4 proses masing-masing increment 100000 kali
proses_list = []
for _ in range(4):
p = multiprocessing.Process(
target=tambah_counter, args=(counter, lock, 100_000)
)
proses_list.append(p)
p.start()
for p in proses_list:
p.join()
print(f"Counter: {counter.value}") # Harusnya: 400000
# Mengisi shared array
proses_array(shared_arr, lock, 0, [1.1, 2.2, 3.3, 4.4, 5.5])
proses_array(shared_arr, lock, 5, [6.6, 7.7, 8.8, 9.9, 10.0])
print(f"Array: {list(shared_arr)}")
# [1.1, 2.2, 3.3, 4.4, 5.5, 6.6, 7.7, 8.8, 9.9, 10.0]
Type Codes untuk Shared Memory
| Type Code | C Type | Python Type | Ukuran |
|---|---|---|---|
'i' | signed int | int | 2 atau 4 bytes |
'd' | double | float | 8 bytes |
'f' | float | float | 4 bytes |
'c' | char | bytes (1) | 1 byte |
SharedMemory (Python 3.8+)
from multiprocessing import Process, shared_memory
import numpy as np
def proses_a(shm_name, shape, dtype):
"""Proses yang mengakses shared memory."""
shm = shared_memory.SharedMemory(name=shm_name)
array = np.ndarray(shape, dtype=dtype, buffer=shm.buf)
array[:] = array * 2 # Kalikan semua elemen dengan 2
shm.close()
if __name__ == '__main__':
# Buat numpy array di shared memory
original = np.array([1, 2, 3, 4, 5], dtype=np.float64)
shm = shared_memory.SharedMemory(create=True, size=original.nbytes)
shared_array = np.ndarray(original.shape, dtype=original.dtype, buffer=shm.buf)
shared_array[:] = original[:]
print(f"Sebelum: {shared_array}")
# Jalankan proses yang memodifikasi shared memory
p = Process(target=proses_a, args=(shm.name, original.shape, original.dtype))
p.start()
p.join()
print(f"Sesudah: {shared_array}") # [2, 4, 6, 8, 10]
# Cleanup
shm.close()
shm.unlink()
8. Sinkronisasi Proses
Ketika beberapa proses mengakses resource yang sama, kita perlu mekanisme sinkronisasi untuk mencegah race condition.
Lock (Mutex)
import multiprocessing
import time
def cetak_nama(lock, nama, delay):
"""Cetak nama dengan lock — satu per satu."""
with lock:
print(f"[{nama}] Mulai kerja")
time.sleep(delay)
print(f"[{nama}] Selesai ({delay}s)")
if __name__ == '__main__':
lock = multiprocessing.Lock()
args = [
(lock, "Proses-A", 2),
(lock, "Proses-B", 1),
(lock, "Proses-C", 3),
]
proses = [multiprocessing.Process(target=cetak_nama, args=a) for a in args]
for p in proses:
p.start()
for p in proses:
p.join()
# Output: Setiap proses menunggu giliran (sequential dengan lock)
# [Proses-A] Mulai kerja
# [Proses-A] Selesai (2s)
# [Proses-B] Mulai kerja
# [Proses-B] Selesai (1s)
# [Proses-C] Mulai kerja
# [Proses-C] Selesai (3s)
Semaphore dan Event
import multiprocessing
import time
# Semaphore: batasi akses ke N proses sekaligus
def akses_resource(sem, nama, durasi):
with sem: # Maksimal 3 proses sekaligus
print(f"[{nama}] Mengakses resource")
time.sleep(durasi)
print(f"[{nama}] Selesai")
# Event: sinyal antar proses
def tunggu_sinyal(event, nama):
print(f"[{nama}] Menunggu sinyal...")
event.wait() # Block sampai event di-set
print(f"[{nama}] Sinyal diterima! Melanjutkan kerja")
if __name__ == '__main__':
# Semaphore: maksimal 3 proses concurrent
sem = multiprocessing.Semaphore(3)
proses = []
for i in range(6):
p = multiprocessing.Process(
target=akses_resource, args=(sem, f"Worker-{i}", 1)
)
proses.append(p)
p.start()
for p in proses:
p.join()
# Event: sinyal
event = multiprocessing.Event()
p1 = multiprocessing.Process(target=tunggu_sinyal, args=(event, "W1"))
p2 = multiprocessing.Process(target=tunggu_sinyal, args=(event, "W2"))
p1.start(); p2.start()
time.sleep(2)
print("[Main] Mengirim sinyal!")
event.set() # Semua waiting processes akan lanjut
p1.join(); p2.join()
9. Manager untuk Data Bersama
Manager menyediakan cara untuk berbagi objek Python kompleks (dict, list, Namespace) antar proses melalui server proxy.
import multiprocessing
def update_dict(shared_dict, key, value):
"""Update shared dict dari proses terpisah."""
shared_dict[key] = value
print(f" Set {key} = {value}")
def update_list(shared_list, item):
"""Append ke shared list."""
shared_list.append(item)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
# Buat shared objects
shared_dict = manager.dict()
shared_list = manager.list()
# Jalankan proses yang mengupdate shared objects
proses = []
for i in range(5):
p = multiprocessing.Process(
target=update_dict, args=(shared_dict, f"key_{i}", i * 10)
)
proses.append(p)
for i in range(3):
p = multiprocessing.Process(
target=update_list, args=(shared_list, f"item_{i}")
)
proses.append(p)
for p in proses:
p.start()
for p in proses:
p.join()
# Semua perubahan terlihat di main process
print(f"Dict: {dict(shared_dict)}")
# {'key_0': 0, 'key_1': 10, 'key_2': 20, ...}
print(f"List: {list(shared_list)}")
# ['item_0', 'item_1', 'item_2']
10. ProcessPoolExecutor
ProcessPoolExecutor dari modul concurrent.futures adalah API modern yang lebih mudah digunakan untuk multiprocessing.
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
import math
def hitung_faktorial(n):
"""Hitung faktorial dan return dengan identifier."""
hasil = math.factorial(n)
return (n, hasil, len(str(hasil)))
def hitung_prime_range(start, end):
"""Hitung bilangan prima dalam range."""
primes = []
for num in range(start, end + 1):
if num < 2:
continue
for i in range(2, int(math.sqrt(num)) + 1):
if num % i == 0:
break
else:
primes.append(num)
return primes
if __name__ == '__main__':
# --- submit() — individual tasks ---
with ProcessPoolExecutor(max_workers=4) as executor:
futures = []
for n in [10000, 20000, 30000, 40000]:
future = executor.submit(hitung_faktorial, n)
futures.append(future)
for f in futures:
n, hasil, digit = f.result()
print(f"{n}! = {digit} digit")
# --- map() — distribute data ---
ranges = [(1, 25000), (25001, 50000), (50001, 75000), (75001, 100000)]
start_time = time.time()
with ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(hitung_prime_range, *zip(*ranges)))
all_primes = []
for primes in results:
all_primes.extend(primes)
duration = time.time() - start_time
print(f"Ditemukan {len(all_primes)} bilangan prima dalam {duration:.2f}s")
# --- as_completed() — process results as they arrive ---
with ProcessPoolExecutor(max_workers=4) as executor:
future_to_n = {
executor.submit(hitung_faktorial, n): n
for n in [100000, 200000, 300000]
}
for future in as_completed(future_to_n):
n = future_to_n[future]
result = future.result()
print(f" {n}! selesai ({result[2]} digit)")
11. Best Practices
- Selalu gunakan
if __name__ == '__main__':guard - Gunakan
withstatement untuk Pool/Executor agar auto-cleanup - Hindari mengirim objek besar antar proses — gunakan shared memory
- Gunakan
initializeruntuk setup yang berat (database connection, model loading) - Pertimbangkan
chunksizediPool.map()untuk data besar
Error Handling dalam Multiprocessing
from concurrent.futures import ProcessPoolExecutor, as_completed
import multiprocessing
def tugas_bisa_error(n):
"""Tugas yang mungkin menghasilkan error."""
if n == 5:
raise ValueError(f"Error pada n={n}")
return n * n
if __name__ == '__main__':
with ProcessPoolExecutor(max_workers=4) as executor:
futures = {
executor.submit(tugas_bisa_error, n): n
for n in range(10)
}
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f" {n} → {result}")
except ValueError as e:
print(f" {n} → ERROR: {e}")
except Exception as e:
print(f" {n} → UNEXPECTED: {e}")
Pattern: Producer-Consumer
import multiprocessing
import time
import random
SENTINEL = "STOP"
def producer(queue, n_items):
"""Menghasilkan item ke queue."""
for i in range(n_items):
item = random.randint(1, 100)
queue.put(item)
print(f" [Producer] → {item}")
time.sleep(random.uniform(0.05, 0.2))
queue.put(SENTINEL)
def consumer(queue, result_queue, nama):
"""Mengambil item dari queue, proses, kirim hasil."""
while True:
item = queue.get()
if item == SENTINEL:
result_queue.put(SENTINEL)
break
# Proses: hitung kuadrat
result = item ** 2
result_queue.put((item, result))
time.sleep(random.uniform(0.1, 0.3))
if __name__ == '__main__':
work_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
# 1 producer, 3 consumers
prod = multiprocessing.Process(target=producer, args=(work_queue, 10))
consumers = [
multiprocessing.Process(target=consumer, args=(work_queue, result_queue, f"C{i}"))
for i in range(3)
]
prod.start()
for c in consumers:
c.start()
prod.join()
for c in consumers:
c.join()
# Kumpulkan hasil
stop_count = 0
while stop_count < 3:
item = result_queue.get()
if item == SENTINEL:
stop_count += 1
else:
print(f" [Result] {item[0]}² = {item[1]}")
12. Quiz Pemahaman
🧠 Quiz: Python Multiprocessing
1. Mengapa multiprocessing lebih cepat dari threading untuk tugas CPU-bound?
2. Metode Pool mana yang mengembalikan hasil dalam urutan input?
3. Apa fungsi utama multiprocessing.Lock?
4. Apa perbedaan Queue dan Pipe?
5. Kenapa harus pakai if __name__ == '__main__':?