1. ⥠Apa Itu Concurrency?
Concurrency adalah kemampuan program untuk menjalankan beberapa tugas secara bersamaan. Go memiliki fitur concurrency yang sangat kuat dan mudah digunakan melalui goroutines dan channels.
Perbedaan penting: Concurrency â Parallelism. Concurrency adalah tentang menangani banyak tugas sekaligus (bisa bergantian), sedangkan parallelism adalah menjalankan banyak tugas secara literal bersamaan (butuh multi-core CPU).
Concurrency vs Parallelism
| Aspek | Concurrency | Parallelism |
|---|---|---|
| Definisi | Menangani banyak tugas sekaligus | Menjalankan banyak tugas bersamaan |
| Analogi | 1 kasir melayani banyak pelanggan bergantian | Banyak kasir melayani pelanggan bersamaan |
| CPU | Bisa single-core | Butuh multi-core |
| Di Go | Goroutines | GOMAXPROCS > 1 (default = semua core) |
| Tujuan | Mengatur struktur program | Menjalankan lebih cepat |
âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ â CONCURRENCY (1 CPU) â â â â Task A: ââââââââââââââââââââ â â Task B: ââââââââââââââââââââââââ â â Time âââââââââââââââââââââââââââ â â â â PARALLELISM (Multi CPU) â â â â CPU 1 - Task A: ââââââââââââââââââââ â â CPU 2 - Task B: ââââââââââââââââââââ â â Time ââââââââââââââââââââââââââââ â âââââââââââââââââââââââââââââââââââââââââââââââââââââââââââ
Go Concurrency Model
Go mengikuti filosofi "Don't communicate by sharing memory; share memory by communicating." â Rob Pike. Artinya, gunakan channels untuk komunikasi antar goroutine, bukan shared variable.
2. Goroutines
Goroutine adalah lightweight thread yang dikelola oleh Go runtime. Goroutine jauh lebih ringan dari OS thread â hanya membutuhkan ~2KB memory (vs ~1MB untuk thread biasa). Anda bisa menjalankan ribuan hingga jutaan goroutine secara bersamaan!
package main
import (
"fmt"
"time"
)
func cetakPesan(pesan string, delay int) {
for i := 0; i < 3; i++ {
fmt.Printf("[%s] %s (ke-%d)\n", time.Now().Format("15:04:05"), pesan, i+1)
time.Sleep(time.Duration(delay) * time.Millisecond)
}
}
func main() {
fmt.Println("=== Tanpa Goroutine (Sequential) ===")
start := time.Now()
cetakPesan("Tugas A", 100)
cetakPesan("Tugas B", 100)
fmt.Printf("Selesai dalam: %v\n\n", time.Since(start))
fmt.Println("=== Dengan Goroutine (Concurrent) ===")
start = time.Now()
// Menjalankan sebagai goroutine dengan keyword 'go'
go cetakPesan("Tugas A", 100)
go cetakPesan("Tugas B", 100)
go cetakPesan("Tugas C", 100)
// Tunggu sebentar agar goroutine selesai
time.Sleep(500 * time.Millisecond)
fmt.Printf("Selesai dalam: %v\n", time.Since(start))
// Output: jauh lebih cepat karena berjalan bersamaan!
}
Anonim Goroutine
package main
import (
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
// Anonymous goroutine dengan WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Goroutine %d mulai\n", id)
time.Sleep(time.Duration(id*100) * time.Millisecond)
fmt.Printf("Goroutine %d selesai\n", id)
}(i) // Penting: pass 'i' sebagai parameter!
}
fmt.Println("Menunggu semua goroutine selesai...")
wg.Wait()
fmt.Println("Semua goroutine selesai! â
")
}
Saat menggunakan goroutine dalam loop, selalu pass variabel loop sebagai parameter ke fungsi anonim. Jika tidak, semua goroutine akan membaca variabel yang sama dan menghasilkan nilai yang tidak terduga (race condition).
Goroutine vs Thread
| Aspek | Goroutine | OS Thread |
|---|---|---|
| Memory | ~2-8 KB awal | ~1 MB |
| Creation | ~0.3 Ξs | ~30 Ξs |
| Context Switch | ~100 ns (user space) | ~1-10 Ξs (kernel) |
| Max Count | Jutaan | Ribuan |
| Scheduling | Go runtime (M:N) | OS kernel (1:1) |
| Komunikasi | Channels (aman) | Shared memory (berbahaya) |
3. Channels
Channel adalah conduit (pipa) untuk komunikasi antar goroutine. Channel memastikan data dikirim dan diterima secara aman tanpa race condition. Ini adalah inti dari motto Go: "Share memory by communicating."
package main
import "fmt"
func main() {
// Membuat channel (unbuffered)
ch := make(chan string)
// Goroutine mengirim data ke channel
go func() {
ch <- "Halo dari goroutine!"
}()
// Menerima data dari channel (blocking!)
pesan := <-ch
fmt.Println(pesan) // Halo dari goroutine!
// Channel dengan tipe data
intCh := make(chan int)
go func() {
for i := 1; i <= 5; i++ {
intCh <- i * 10
}
close(intCh) // Tutup channel setelah selesai
}()
// Menerima dari channel yang sudah di-close
for nilai := range intCh {
fmt.Println(nilai) // 10, 20, 30, 40, 50
}
}
Channel Direction
package main
import "fmt"
// Channel hanya untuk mengirim (write-only)
func producer(ch chan<- int) {
for i := 0; i < 5; i++ {
ch <- i * i
}
close(ch)
}
// Channel hanya untuk menerima (read-only)
func consumer(ch <-chan int) {
for val := range ch {
fmt.Printf("Menerima: %d\n", val)
}
}
func main() {
ch := make(chan int)
go producer(ch) // Mengirim ke channel
consumer(ch) // Menerima dari channel
// Channel direction meningkatkan type safety:
// chan<- int = send-only channel
// <-chan int = receive-only channel
// chan int = bidirectional channel
}
Contoh: Fan-Out Fan-In
package main
import (
"fmt"
"sync"
)
// Fan-In: menggabungkan beberapa channel menjadi satu
func fanIn(channels ...<-chan string) <-chan string {
var wg sync.WaitGroup
merged := make(chan string)
for _, ch := range channels {
wg.Add(1)
go func(c <-chan string) {
defer wg.Done()
for val := range c {
merged <- val
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
func producer(id int) <-chan string {
ch := make(chan string)
go func() {
for i := 1; i <= 3; i++ {
ch <- fmt.Sprintf("Producer %d - Item %d", id, i)
}
close(ch)
}()
return ch
}
func main() {
// Fan-Out: 3 producer berjalan paralel
ch1 := producer(1)
ch2 := producer(2)
ch3 := producer(3)
// Fan-In: gabungkan semua output ke satu channel
merged := fanIn(ch1, ch2, ch3)
for pesan := range merged {
fmt.Println(pesan)
}
fmt.Println("Semua producer selesai! â
")
}
4. Buffered Channels
Channel default bersifat unbuffered â pengirim harus menunggu sampai ada penerima. Buffered channel memiliki kapasitas penyimpanan, sehingga pengirim bisa mengirim beberapa data tanpa menunggu penerima.
package main
import (
"fmt"
"time"
)
func main() {
// Unbuffered channel â blocking
ch1 := make(chan string)
go func() {
ch1 <- "data" // Blok sampai ada yang menerima
}()
fmt.Println(<-ch1)
// Buffered channel â kapasitas 3
ch2 := make(chan int, 3)
// Kirim tanpa blocking (sampai buffer penuh)
ch2 <- 1
ch2 <- 2
ch2 <- 3
// ch2 <- 4 // AKAN DEADLOCK! Buffer penuh
fmt.Println(<-ch2) // 1
fmt.Println(<-ch2) // 2
fmt.Println(<-ch2) // 3
fmt.Printf("Len: %d, Cap: %d\n", len(ch2), cap(ch2))
// Len: 0 (sudah diambil semua), Cap: 3
// Contoh praktik: Rate limiter
limiter := make(chan time.Time, 5)
go func() {
for i := 0; i < 10; i++ {
limiter <- time.Now()
time.Sleep(200 * time.Millisecond)
}
close(limiter)
}()
for t := range limiter {
fmt.Printf("Request diproses pada: %s\n", t.Format("15:04:05.000"))
}
}
Gunakan unbuffered saat butuh sinkronisasi (pastikan data diterima). Gunakan buffered saat produksi lebih cepat dari konsumsi, atau sebagai rate limiter. Jangan terlalu besar â buffer yang berlebihan menyembunyikan masalah (bufferbloat).
5. Select Statement
select memungkinkan goroutine menunggu beberapa channel sekaligus. Mirip seperti switch, tapi untuk channel operations.
package main
import (
"fmt"
"time"
)
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "Response dari API 1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "Response dari API 2"
}()
// Select â tunggu channel yang pertama ready
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Diterima:", msg1)
case msg2 := <-ch2:
fmt.Println("Diterima:", msg2)
}
}
// Select dengan timeout
timeoutCh := make(chan string)
go func() {
time.Sleep(3 * time.Second) // Simulasi proses lama
timeoutCh <- "Selesai!"
}()
select {
case result := <-timeoutCh:
fmt.Println("Hasil:", result)
case <-time.After(1 * time.Second):
fmt.Println("Timeout! â° Proses terlalu lama")
}
// Non-blocking select dengan default
dataCh := make(chan string, 1)
dataCh <- "test"
select {
case val := <-dataCh:
fmt.Println("Ada data:", val)
default:
fmt.Println("Channel kosong, lanjut...")
}
}
Contoh: Fan-Out dengan Select
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchData(name string, ch chan<- string) {
delay := time.Duration(rand.Intn(3)+1) * time.Second
time.Sleep(delay)
ch <- fmt.Sprintf("[%s] Data siap setelah %v", name, delay)
}
func main() {
rand.Seed(time.Now().UnixNano())
api1 := make(chan string, 1)
api2 := make(chan string, 1)
api3 := make(chan string, 1)
go fetchData("Google", api1)
go fetchData("GitHub", api2)
go fetchData("NewsAPI", api3)
// Tunggu semua dengan select (yang pertama selesai duluan)
for i := 0; i < 3; i++ {
select {
case res := <-api1:
fmt.Println(res)
case res := <-api2:
fmt.Println(res)
case res := <-api3:
fmt.Println(res)
}
}
}
6. Sync Package
Package sync menyediakan primitive sinkronisasi seperti WaitGroup, Mutex, Once, dan Pool.
WaitGroup
package main
import (
"fmt"
"sync"
"time"
)
func downloadFile(url string, wg *sync.WaitGroup) {
defer wg.Done() // Kurangi counter saat selesai
fmt.Printf("ðĨ Mulai download: %s\n", url)
time.Sleep(time.Duration(len(url)) * 100 * time.Millisecond)
fmt.Printf("â
Selesai download: %s\n", url)
}
func main() {
var wg sync.WaitGroup
urls := []string{
"https://api.example.com/users",
"https://api.example.com/products",
"https://api.example.com/orders",
"https://api.example.com/reports",
}
for _, url := range urls {
wg.Add(1) // Tambah counter
go downloadFile(url, &wg)
}
fmt.Println("Menunggu semua download selesai...")
wg.Wait() // Block sampai counter = 0
fmt.Println("ð Semua file berhasil didownload!")
}
sync.Once â Hanya Sekali
package main
import (
"fmt"
"sync"
)
type Config struct {
DBHost string
DBPort int
}
var (
config *Config
once sync.Once
)
// GetConfig â singleton pattern dengan sync.Once
func GetConfig() *Config {
once.Do(func() {
fmt.Println("ð§ Loading config... (hanya sekali)")
config = &Config{
DBHost: "localhost",
DBPort: 5432,
}
})
return config
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
cfg := GetConfig()
fmt.Printf("Worker %d: DB=%s:%d\n", id, cfg.DBHost, cfg.DBPort)
}(i)
}
wg.Wait()
// "Loading config..." hanya dicetak sekali!
}
sync.Map â Thread-Safe Map
package main
import (
"fmt"
"sync"
)
func main() {
var cache sync.Map
// Store â simpan data
cache.Store("user:1", "Budi")
cache.Store("user:2", "Ani")
cache.Store("user:3", "Dimas")
// Load â ambil data
val, ok := cache.Load("user:1")
if ok {
fmt.Println(val) // Budi
}
// LoadOrStore â ambil atau simpan jika belum ada
actual, loaded := cache.LoadOrStore("user:4", "Sari")
fmt.Printf("Value: %s, Already existed: %t\n", actual, loaded)
// Delete â hapus
cache.Delete("user:2")
// Range â iterasi semua
cache.Range(func(key, value any) bool {
fmt.Printf("%v: %v\n", key, value)
return true // return false untuk berhenti
})
}
7. Mutex & RWMutex
Mutex (Mutual Exclusion) melindungi shared resource dari concurrent access. Gunakan mutex saat beberapa goroutine perlu mengakses dan memodifikasi data yang sama.
package main
import (
"fmt"
"sync"
)
// SafeCounter â thread-safe counter
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Get() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.count
}
func main() {
counter := SafeCounter{}
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Final count:", counter.Get()) // Selalu 1000
// Tanpa mutex, hasilnya bisa kurang dari 1000 (race condition!)
}
RWMutex â Read-Write Mutex
package main
import (
"fmt"
"sync"
"time"
)
type SafeCache struct {
mu sync.RWMutex
data map[string]string
}
func NewSafeCache() *SafeCache {
return &SafeCache{data: make(map[string]string)}
}
func (c *SafeCache) Set(key, value string) {
c.mu.Lock() // Exclusive lock â hanya 1 writer
defer c.mu.Unlock()
c.data[key] = value
}
func (c *SafeCache) Get(key string) (string, bool) {
c.mu.RLock() // Shared lock â bisa banyak reader
defer c.mu.RUnlock()
val, ok := c.data[key]
return val, ok
}
func main() {
cache := NewSafeCache()
var wg sync.WaitGroup
// Writers
for i := 0; i < 5; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
key := fmt.Sprintf("key-%d", id)
cache.Set(key, fmt.Sprintf("value-%d", id))
}(i)
}
// Readers
for i := 0; i < 10; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
time.Sleep(10 * time.Millisecond) // Tunggu writer
val, ok := cache.Get(fmt.Sprintf("key-%d", id%5))
if ok {
fmt.Printf("Reader %d: %s\n", id, val)
}
}(i)
}
wg.Wait()
fmt.Println("Selesai â
")
}
8. Worker Pool Pattern
Worker Pool adalah pattern paling umum di Go concurrency. Batasi jumlah goroutine yang berjalan bersamaan untuk menghindari resource exhaustion.
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
JobID int
Output string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf(" Worker %d memproses Job %d\n", id, job.ID)
time.Sleep(time.Duration(rand.Intn(500)+200) * time.Millisecond)
results <- Result{
JobID: job.ID,
Output: fmt.Sprintf("Job '%s' selesai oleh Worker %d", job.Data, id),
}
}
fmt.Printf(" Worker %d selesai\n", id)
}
func main() {
const numJobs = 10
const numWorkers = 3
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// Start workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// Send jobs
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Data: fmt.Sprintf("task-%d", j)}
}
close(jobs)
// Close results setelah semua worker selesai
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("â
Result: %s\n", result.Output)
}
fmt.Println("\nð Semua job selesai!")
}
9. Pipeline Pattern
Pipeline menghubungkan beberapa stage yang masing-masing membaca dari input channel, memproses, dan mengirim ke output channel. Setiap stage berjalan sebagai goroutine terpisah.
package main
import (
"fmt"
"strings"
)
// Stage 1: Generate angka
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// Stage 2: Filter â hanya ambil angka genap
func filter(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if n%2 == 0 {
out <- n
}
}
close(out)
}()
return out
}
// Stage 3: Transform â kuadratkan
func transform(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
// Pipeline String
func toUpper(in <-chan string) <-chan string {
out := make(chan string)
go func() {
for s := range in {
out <- strings.ToUpper(s)
}
close(out)
}()
return out
}
func addPrefix(in <-chan string, prefix string) <-chan string {
out := make(chan string)
go func() {
for s := range in {
out <- prefix + ": " + s
}
close(out)
}()
return out
}
func main() {
// Pipeline: generate â filter â transform
// Input: 1-10 â Genap â Kuadrat
nums := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
evens := filter(nums)
squares := transform(evens)
fmt.Println("=== Number Pipeline ===")
for result := range squares {
fmt.Println(result) // 4, 16, 36, 64, 100
}
// Pipeline String
fmt.Println("\n=== String Pipeline ===")
words := make(chan string)
go func() {
for _, w := range []string{"hello", "world", "golang"} {
words <- w
}
close(words)
}()
upper := toUpper(words)
prefixed := addPrefix(upper, "LOG")
for msg := range prefixed {
fmt.Println(msg) // LOG: HELLO, LOG: WORLD, LOG: GOLANG
}
}
10. Context & Timeout
context digunakan untuk mengelola lifecycle goroutine, mengirim sinyal cancellation, dan mengatur deadline/timeout. Sangat penting untuk HTTP handler, database query, dan operasi jaringan.
package main
import (
"context"
"fmt"
"time"
)
func fetchData(ctx context.Context, name string) string {
select {
case <-time.After(2 * time.Second): // Simulasi proses lama
return fmt.Sprintf("Data %s berhasil", name)
case <-ctx.Done():
return fmt.Sprintf("Data %s dibatalkan: %v", name, ctx.Err())
}
}
func main() {
// Context dengan timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() // Selalu panggil cancel!
result := fetchData(ctx, "users")
fmt.Println(result)
// Output: "Data users dibatalkan: context deadline exceeded"
// Context dengan value
ctx2 := context.WithValue(context.Background(), "requestID", "abc-123")
requestID := ctx2.Value("requestID")
fmt.Println("Request ID:", requestID)
// Context dengan cancel manual
ctx3, cancel3 := context.WithCancel(context.Background())
go func() {
result := fetchData(ctx3, "products")
fmt.Println(result)
}()
time.Sleep(500 * time.Millisecond)
cancel3() // Batalkan!
time.Sleep(100 * time.Millisecond)
// Contoh: batch request dengan timeout
fmt.Println("\n=== Batch Request ===")
services := []string{"users", "products", "orders", "reports"}
batchCtx, batchCancel := context.WithTimeout(context.Background(), 3*time.Second)
defer batchCancel()
for _, svc := range services {
go func(s string) {
fmt.Println(fetchData(batchCtx, s))
}(svc)
}
time.Sleep(4 * time.Second)
}
11. Race Condition & Best Practices
Race condition terjadi ketika dua atau lebih goroutine mengakses shared resource secara bersamaan tanpa sinkronisasi. Go memiliki tool bawaan untuk mendeteksi race condition.
# Jalankan dengan -race flag untuk mendeteksi race condition:
go run -race main.go
go test -race ./...
# Contoh race condition (JANGAN ditiru!):
var counter int
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // RACE CONDITION! â ïļ
}()
}
wg.Wait()
// Hasil: bisa 987, 993, atau apapun â tidak konsisten!
# Solusi: gunakan Mutex atau atomic operations
# Best Practices:
# 1. Hindari shared state â gunakan channels
# 2. Gunakan sync.WaitGroup untuk menunggu goroutine
# 3. Selalu gunakan -race saat testing
# 4. Hindari goroutine leak â pastikan semua goroutine selesai
# 5. Gunakan context untuk cancellation
# 6. Jangan gunakan panic di goroutine (gunakan recover)
# 7. Close channel dari sisi pengirim, bukan penerima
Pastikan setiap goroutine memiliki cara untuk berhenti. Goroutine yang tidak pernah selesai (leak) akan menghabiskan memory. Gunakan context, done channel, atau timeout untuk memastikan goroutine bisa dihentikan.
12. Quiz: Uji Pemahamanmu!
Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut untuk menguji pemahamanmu tentang Go Concurrency: