1. Pengenalan Concurrency
Concurrency (konkurensi) adalah kemampuan program untuk menjalankan beberapa tugas secara bersamaan. Di Rust, concurrency adalah salah satu fitur terkuat — Rust menjamin keamanan concurrency saat compile time, yang disebut "Fearless Concurrency".
Concurrency vs Parallelism
| Konsep | Penjelasan | Analogi |
|---|---|---|
| Concurrency | Menangani beberapa tugas dalam satu periode waktu (bisa bergantian) | Satu koki memasak beberapa hidangan dengan bergantian |
| Parallelism | Menjalankan beberapa tugas secara LITERAL bersamaan (di core berbeda) | Beberapa koki memasak beberapa hidangan sekaligus |
Mengapa Concurrency Sulit?
| Masalah | Penjelasan | Rust Mencegah? |
|---|---|---|
| Data Race | Dua thread mengakses data bersamaan, salah satunya menulis | ✅ Ya, compile-time |
| Deadlock | Dua thread saling menunggu satu sama lain | ❌ Tidak (perlu hati-hati) |
| Race Condition | Hasil tergantung urutan eksekusi yang tidak terduga | ✅ Sebagian besar |
| Memory Safety | Thread mengakses memori yang sudah di-drop | ✅ Ya, ownership system |
┌─────────────────────────────────────────────────────────────┐ │ CONCURRENCY vs PARALLELISM │ │ │ │ CONCURRENCY (satu core, bergantian): │ │ Thread A: ██──░░██──░░██── │ │ Thread B: ░░██──░░██──░░██ │ │ (██ = bekerja, ░░ = menunggu) │ │ │ │ PARALLELISM (multi-core, bersamaan): │ │ Core 1 — Thread A: ████████████████ │ │ Core 2 — Thread B: ████████████████ │ │ Core 3 — Thread C: ████████████████ │ │ │ │ Rust mendukung keduanya dengan aman! │ └─────────────────────────────────────────────────────────────┘
Rust mencegah data race saat compile time melalui sistem ownership. Jika kode Anda bisa menyebabkan data race, compiler tidak akan mengizinkannya. Ini berarti Anda bisa menulis kode concurrent dengan percaya diri.
2. Threads
Rust menggunakan OS threads (1:1 mapping) melalui std::thread. Setiap thread memiliki stack memory sendiri dan berjalan secara independen.
Membuat Thread
use std::thread;
use std::time::Duration;
fn main() {
// Membuat thread baru
let handle = thread::spawn(|| {
for i in 1..=5 {
println!("Thread anak: iterasi {}", i);
thread::sleep(Duration::from_millis(100));
}
});
// Kode di main thread tetap berjalan
for i in 1..=3 {
println!("Main thread: iterasi {}", i);
thread::sleep(Duration::from_millis(150));
}
// Tunggu thread anak selesai
handle.join().unwrap();
println!("Semua thread selesai!");
// Output bisa bervariasi karena urutan tidak pasti:
// Main thread: iterasi 1
// Thread anak: iterasi 1
// Thread anak: iterasi 2
// Main thread: iterasi 2
// ...
}
// Beberapa thread sekaligus
fn contoh_banyak_thread() {
let mut handles = vec![];
for i in 0..5 {
let handle = thread::spawn(move || {
println!("Thread {} dimulai", i);
thread::sleep(Duration::from_millis(100));
println!("Thread {} selesai", i);
i * 10
});
handles.push(handle);
}
// Kumpulkan hasil dari semua thread
let mut hasil = vec![];
for handle in handles {
let nilai = handle.join().unwrap();
hasil.push(nilai);
}
println!("Hasil dari semua thread: {:?}", hasil);
}
Builder Pattern untuk Thread
use std::thread;
fn main() {
// Thread dengan nama custom
let builder = thread::Builder::new()
.name("worker-1".to_string())
.stack_size(4 * 1024 * 1024); // 4 MB stack
let handle = builder.spawn(|| {
println!("Thread name: {:?}", thread::current().name());
"hasil dari thread"
}).expect("Gagal membuat thread");
let hasil = handle.join().unwrap();
println!("Thread mengembalikan: {}", hasil);
// Jumlah CPU cores yang tersedia
let cores = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1);
println!("CPU cores tersedia: {}", cores);
}
3. Move Closures dan Threads
Thread membutuhkan move closure untuk mengambil ownership data yang dikirim ke thread. Ini memastikan data tetap valid selama thread berjalan.
use std::thread;
fn main() {
let nama = String::from("Budi");
let data = vec![1, 2, 3, 4, 5];
// move — pindahkan ownership ke thread
let handle = thread::spawn(move || {
println!("Halo, {}!", nama); // nama sekarang dimiliki oleh thread
println!("Data: {:?}", data); // data juga dimiliki oleh thread
data.iter().sum::<i32>() // kembalikan hasil
});
// println!("{}", nama); // ERROR! nama sudah dipindahkan
// println!("{:?}", data); // ERROR! data sudah dipindahkan
let total = handle.join().unwrap();
println!("Total: {}", total); // 15
}
// Jika ingin menggunakan data di main DAN thread, gunakan clone
fn contoh_clone() {
let data = vec![1, 2, 3];
let data_clone = data.clone(); // clone untuk thread
let handle = thread::spawn(move || {
println!("Thread: {:?}", data_clone);
data_clone.iter().sum::<i32>()
});
println!("Main: {:?}", data); // data asli masih bisa digunakan
let total = handle.join().unwrap();
println!("Total: {}", total);
}
// Lebih baik: gunakan Arc untuk shared ownership
fn contoh_arc() {
use std::sync::Arc;
let data = Arc::new(vec![1, 2, 3, 4, 5]);
let data_ref = Arc::clone(&data);
let handle = thread::spawn(move || {
println!("Thread: {:?}", data_ref);
data_ref.iter().sum::<i32>()
});
println!("Main: {:?}", data); // Bisa digunakan di main juga
let total = handle.join().unwrap();
println!("Total: {}", total);
}
4. Shared State: Mutex<T>
Mutex (Mutual Exclusion) memastikan hanya satu thread yang bisa mengakses data pada satu waktu. Mutex di Rust menggunakan sistem ownership untuk mencegah akses data tanpa lock.
use std::sync::Mutex;
fn main() {
// Mutex melindungi data
let m = Mutex::new(5);
{
// lock() mengembalikan MutexGuard (smart pointer)
let mut data = m.lock().unwrap();
*data = 10;
println!("Data di dalam lock: {}", data);
} // MutexGuard di-drop di sini → lock dilepaskan
println!("Data setelah lock: {:?}", m);
// Mutex dengan counter
let counter = Mutex::new(0);
// Increment counter dengan lock
for _ in 0..10 {
let mut num = counter.lock().unwrap();
*num += 1;
}
println!("Counter: {}", *counter.lock().unwrap()); // 10
}
// Masalah: Mutex sendiri TIDAK bisa di-share antar thread!
// Perlu Arc (Atomic Reference Counted)
// fn contoh_gagal() {
// let counter = Mutex::new(0);
// let handle = thread::spawn(move || {
// *counter.lock().unwrap() += 1;
// });
// *counter.lock().unwrap() += 1; // ERROR: counter sudah dipindahkan
// handle.join().unwrap();
// }
Menangani Poisoned Mutex
use std::sync::Mutex;
use std::thread;
fn main() {
let data = Mutex::new(vec![1, 2, 3]);
// Jika thread panic saat memegang lock, mutex "poisoned"
let handle = thread::spawn(move || {
let mut guard = data.lock().unwrap();
guard.push(4);
panic!("Thread crash!");
});
let _ = handle.join();
// lock() pada poisoned mutex mengembalikan Err
// unwrap() akan panic, tapi kita bisa handle:
// match data.lock() {
// Ok(guard) => println!("Data: {:?}", *guard),
// Err(poisoned) => {
// let guard = poisoned.into_inner();
// println!("Mutex poisoned, tapi data: {:?}", *guard);
// }
// }
}
5. Arc<T> — Atomic Reference Counting
Arc (Atomic Reference Counted) adalah versi thread-safe dari Rc. Arc + Mutex adalah kombinasi yang sangat umum untuk shared mutable state antar threads.
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
// Arc<Mutex<T>> — pola paling umum untuk shared mutable state
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for i in 0..10 {
let counter_ref = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter_ref.lock().unwrap();
*num += 1;
println!("Thread {} mengincrement counter ke {}", i, *num);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Final counter: {}", *counter.lock().unwrap()); // 10
}
// Contoh: shared data antar banyak thread
fn contoh_shared_data() {
let data = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
for i in 0..5 {
let data_ref = Arc::clone(&data);
let handle = thread::spawn(move || {
let mut vec = data_ref.lock().unwrap();
vec.push(i * 10);
println!("Thread {} menambahkan {}", i, i * 10);
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
let result = data.lock().unwrap();
println!("Hasil: {:?}", *result);
}
// Perbandingan Rc vs Arc
fn perbandingan() {
use std::rc::Rc;
use std::sync::Arc;
// Rc — single-threaded, lebih cepat
let rc = Rc::new(42);
let rc2 = Rc::clone(&rc);
// TIDAK bisa dikirim ke thread lain!
// Arc — multi-threaded, atomic operations
let arc = Arc::new(42);
let arc2 = Arc::clone(&arc);
// BISA dikirim ke thread lain ✅
}
6. Channels — Message Passing
Channels adalah cara lain untuk melakukan concurrency: alih-alih berbagi data, thread saling mengirim pesan. Ini disebut "message passing" concurrency, dipopulerkan oleh bahasa Go.
mpsc Channel
use std::sync::mpsc; // multi-producer, single-consumer
use std::thread;
use std::time::Duration;
fn main() {
// Membuat channel: tx = transmitter, rx = receiver
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let pesan = String::from("Halo dari thread!");
tx.send(pesan).unwrap();
// println!("{}", pesan); // ERROR! pesan sudah dipindahkan
});
// recv() memblokir sampai ada pesan
let diterima = rx.recv().unwrap();
println!("Diterima: {}", diterima);
}
// Mengirim beberapa pesan
fn contoh_banyak_pesan() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let pesan = vec![
String::from("halo"),
String::from("dari"),
String::from("thread"),
];
for p in pesan {
tx.send(p).unwrap();
thread::sleep(Duration::from_millis(200));
}
// tx di-drop di sini → channel tertutup
});
// Iterasi rx seperti iterator — otomatis berhenti saat channel tertutup
for diterima in rx {
println!("Diterima: {}", diterima);
}
}
// Multiple producers
fn contoh_multi_producer() {
let (tx, rx) = mpsc::channel();
// Clone tx untuk producer lain
for i in 0..3 {
let tx_clone = tx.clone();
thread::spawn(move || {
let pesan = format!("Pesan dari producer {}", i);
tx_clone.send(pesan).unwrap();
});
}
// Drop tx asli (semua clone sudah di thread)
drop(tx);
for diterima in rx {
println!("{}", diterima);
}
}
Sync Channel
use std::sync::mpsc;
use std::thread;
fn main() {
// sync_channel — buffer terbatas, sender memblokir jika penuh
let (tx, rx) = mpsc::sync_channel(3); // buffer 3 pesan
thread::spawn(move || {
for i in 0..10 {
println!("Mengirim pesan {}", i);
tx.send(i).unwrap(); // akan memblokir jika buffer penuh
println!("Pesan {} terkirim", i);
}
});
for diterima in rx {
println!("Menerima: {}", diterima);
thread::sleep(std::time::Duration::from_millis(500));
}
}
7. Send dan Sync Traits
Rust menggunakan dua marker traits untuk menjamin keamanan concurrency: Send dan Sync.
| Trait | Arti | Contoh |
|---|---|---|
Send | Tipe bisa dipindahkan ke thread lain | i32, String, Vec<T>, Arc<T> |
!Send | Tipe TIDAK bisa dikirim ke thread lain | Rc<T>, raw pointers |
Sync | Tipe bisa di-referensi dari beberapa thread | i32, &str, Arc<T>, Mutex<T> |
!Sync | Tipe TIDAK aman diakses beberapa thread | Rc<T>, Cell<T>, RefCell<T> |
use std::sync::Arc;
use std::rc::Rc;
use std::thread;
fn main() {
// ✅ Send: i32 bisa dikirim ke thread lain
let angka = 42;
let handle = thread::spawn(move || {
println!("Angka: {}", angka);
});
handle.join().unwrap();
// ✅ Send: String bisa dikirim
let nama = String::from("Budi");
let handle = thread::spawn(move || {
println!("Nama: {}", nama);
});
handle.join().unwrap();
// ❌ !Send: Rc TIDAK bisa dikirim ke thread
let rc = Rc::new(42);
// thread::spawn(move || { // COMPILE ERROR!
// println!("{}", rc);
// });
println!("Rc hanya untuk single-thread: {}", rc);
// ✅ Arc: thread-safe Rc
let arc = Arc::new(42);
let arc_clone = Arc::clone(&arc);
let handle = thread::spawn(move || {
println!("Arc di thread: {}", arc_clone);
});
handle.join().unwrap();
// Aturan praktis:
// - Hampir semua tipe standar adalah Send
// - Rc, Cell, RefCell adalah !Send
// - Arc, Mutex, RwLock adalah Send + Sync
// - Compiler secara otomatis mengecek Send/Sync
}
// Custom struct — otomatis Send jika semua field Send
#[derive(Debug)]
struct Config {
nama: String, // Send ✅
max_conn: u32, // Send ✅
}
// Config otomatis implements Send karena semua field-nya Send
fn kirim_config() {
let config = Config {
nama: String::from("server"),
max_conn: 100,
};
let handle = thread::spawn(move || {
println!("Config di thread: {:?}", config);
});
handle.join().unwrap();
}
8. Async/Await
Async/await adalah cara modern untuk menulis kode concurrent yang non-blocking. Berbeda dari threads yang berjalan di OS level, async menggunakan green threads (tasks) yang dijalankan oleh runtime seperti Tokio.
Perbedaan Thread vs Async
| Aspek | Threads | Async/Await |
|---|---|---|
| Overhead per unit | ~8MB stack per thread | ~ratusan byte per task |
| Context switch | OS scheduler | Runtime scheduler |
| Cocok untuk | CPU-bound work | I/O-bound work |
| Skalabilitas | ~ribuan thread | ~jutaan task |
| Runtime | Tidak perlu (OS) | Perlu (tokio, async-std) |
Konsep Dasar Future
// Future adalah trait yang merepresentasikan operasi async
// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
// }
// async fn secara otomatis mengembalikan Future
async fn ambil_data(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
// async block
async fn proses() {
let data = ambil_data("https://api.example.com").await;
match data {
Ok(body) => println!("Data: {}", &body[..100]),
Err(e) => println!("Error: {}", e),
}
}
// .await "menunggu" Future selesai tanpa memblokir thread
// Runtime bisa menjalankan task lain sambil menunggu I/O
9. Tokio Runtime
Tokio adalah runtime async paling populer di ekosistem Rust. Tokio menyediakan scheduler, I/O reactor, timer, dan utilities async lainnya.
Setup Tokio
[dependencies]
tokio = { version = "1", features = ["full"] }
# features "full" mengaktifkan semua fitur tokio
# Alternatif: pilih fitur yang dibutuhkan saja
# tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "sync"] }
Menggunakan Tokio
use tokio::time::{sleep, Duration};
use tokio::fs;
// #[tokio::main] mengubah fn main menjadi async
#[tokio::main]
async fn main() {
println!("Program async dimulai!");
// Menjalankan beberapa async task secara concurrent
let handle1 = tokio::spawn(async {
sleep(Duration::from_millis(500)).await;
println!("Task 1 selesai");
10
});
let handle2 = tokio::spawn(async {
sleep(Duration::from_millis(300)).await;
println!("Task 2 selesai");
20
});
let handle3 = tokio::spawn(async {
sleep(Duration::from_millis(100)).await;
println!("Task 3 selesai");
30
});
// Tunggu semua task selesai
let r1 = handle1.await.unwrap();
let r2 = handle2.await.unwrap();
let r3 = handle3.await.unwrap();
println!("Total: {}", r1 + r2 + r3); // 60
// Semua task berjalan CONCURRENT, total waktu ~500ms bukan 900ms
}
// Tokio async file I/O
async fn baca_file_async() -> Result<(), Box<dyn std::error::Error>> {
let isi = fs::read_to_string("data.txt").await?;
println!("Isi file: {}", isi);
Ok(())
}
// Tokio select! — tunggu beberapa future, ambil yang pertama selesai
async fn contoh_select() {
tokio::select! {
_ = sleep(Duration::from_secs(1)) => {
println!("Timer 1 selesai duluan");
}
_ = sleep(Duration::from_millis(500)) => {
println!("Timer 2 selesai duluan");
}
}
}
Tokio Channels
use tokio::sync::{mpsc, oneshot, broadcast};
#[tokio::main]
async fn main() {
// mpsc channel — multi-producer, single-consumer
let (tx, mut rx) = mpsc::channel(32);
tokio::spawn(async move {
for i in 0..5 {
tx.send(format!("Pesan {}", i)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
});
while let Some(msg) = rx.recv().await {
println!("Diterima: {}", msg);
}
// oneshot channel — satu nilai, satu kali
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tx.send(42).unwrap();
});
let nilai = rx.await.unwrap();
println!("Oneshot: {}", nilai);
// broadcast channel — semua receiver mendapat semua pesan
let (tx, _) = broadcast::channel(16);
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
tokio::spawn(async move {
tx.send("hello").unwrap();
});
println!("rx1: {}", rx1.recv().await.unwrap()); // hello
println!("rx2: {}", rx2.recv().await.unwrap()); // hello
}
Tokio Mutex dan RwLock
use tokio::sync::{Mutex, RwLock};
use std::sync::Arc;
#[tokio::main]
async fn main() {
// Tokio Mutex — bisa di-await (tidak memblokir thread OS)
let data = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
for i in 0..5 {
let data_ref = Arc::clone(&data);
handles.push(tokio::spawn(async move {
let mut guard = data_ref.lock().await; // .await, bukan .unwrap()
guard.push(i);
}));
}
for h in handles {
h.await.unwrap();
}
println!("Data: {:?}", *data.lock().await);
// RwLock — banyak reader, satu writer
let config = Arc::new(RwLock::new(String::from("initial")));
// Banyak reader sekaligus — OK
let config_ref = Arc::clone(&config);
let reader1 = tokio::spawn(async move {
let data = config_ref.read().await;
println!("Reader: {}", *data);
});
// Writer — eksklusif
let config_ref = Arc::clone(&config);
let writer = tokio::spawn(async move {
let mut data = config_ref.write().await;
*data = String::from("updated");
});
reader1.await.unwrap();
writer.await.unwrap();
}
10. Concurrency Patterns
Berikut beberapa pola concurrency yang umum digunakan di Rust.
Pattern: Worker Pool
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
let data = Arc::new(Mutex::new(Vec::new()));
let mut handles = vec![];
// Buat 4 worker threads
for worker_id in 0..4 {
let data_ref = Arc::clone(&data);
let handle = thread::spawn(move || {
for task in 0..3 {
let hasil = worker_id * 100 + task;
println!("Worker {} memproses task {}", worker_id, task);
thread::sleep(std::time::Duration::from_millis(100));
let mut guard = data_ref.lock().unwrap();
guard.push(hasil);
}
});
handles.push(handle);
}
for h in handles {
h.join().unwrap();
}
let result = data.lock().unwrap();
println!("Semua hasil: {:?}", *result);
}
Pattern: Tokio Join dan Select
use tokio::time::{sleep, Duration};
async fn fetch_user() -> String {
sleep(Duration::from_millis(200)).await;
String::from("Budi")
}
async fn fetch_posts() -> Vec<String> {
sleep(Duration::from_millis(300)).await;
vec![String::from("Post 1"), String::from("Post 2")]
}
async fn fetch_stats() -> u32 {
sleep(Duration::from_millis(100)).await;
42
}
#[tokio::main]
async fn main() {
// tokio::join! — jalankan semua secara concurrent, tunggu semua selesai
let (user, posts, stats) = tokio::join!(
fetch_user(),
fetch_posts(),
fetch_stats(),
);
println!("User: {}", user);
println!("Posts: {:?}", posts);
println!("Stats: {}", stats);
// Total waktu: ~300ms (bukan 600ms)
// tokio::select! — ambil yang pertama selesai
tokio::select! {
user = fetch_user() => {
println!("User selesai duluan: {}", user);
}
posts = fetch_posts() => {
println!("Posts selesai duluan: {:?}", posts);
}
stats = fetch_stats() => {
println!("Stats selesai duluan: {}", stats);
}
}
// Hanya satu branch yang dieksekusi
}
Pattern: Timeout
use tokio::time::{timeout, Duration};
async fn operasi_lambat() -> String {
tokio::time::sleep(Duration::from_secs(5)).await;
String::from("selesai")
}
#[tokio::main]
async fn main() {
// timeout — batasi waktu tunggu
match timeout(Duration::from_secs(2), operasi_lambat()).await {
Ok(hasil) => println!("Berhasil: {}", hasil),
Err(_) => println!("Timeout! Operasi terlalu lama"),
}
// Pola retry
let mut attempts = 0;
let max_attempts = 3;
loop {
attempts += 1;
match timeout(Duration::from_millis(500), operasi_lambat()).await {
Ok(result) => {
println!("Berhasil pada attempt {}: {}", attempts, result);
break;
}
Err(_) if attempts < max_attempts => {
println!("Attempt {} timeout, retry...", attempts);
}
Err(_) => {
println!("Gagal setelah {} attempts", max_attempts);
break;
}
}
}
}
- Threads: untuk CPU-bound work (komputasi berat, processing data)
- Async/Tokio: untuk I/O-bound work (web server, database, network)
- Arc + Mutex: untuk shared state antar threads
- Channels: untuk message passing (komunikasi antar thread/task)
- tokio::join!: menjalankan beberapa operasi async bersamaan
- tokio::select!: menunggu yang pertama selesai dari beberapa operasi
📝 Quiz Pemahaman Concurrency
Uji pemahaman Anda tentang concurrency di Rust!