1. Mengapa Message Queue?
Message Queue adalah middleware yang memungkinkan aplikasi berkomunikasi secara asynchronous melalui pesan. Alih-alih service A langsung memanggil service B (synchronous), service A mengirim pesan ke queue, dan service B memprosesnya kapan pun siap.
Synchronous vs Asynchronous Communication
| Aspek | Synchronous (HTTP/gRPC) | Asynchronous (Message Queue) |
|---|---|---|
| Coupling | Tight β caller harus tunggu response | Loose β producer tidak tunggu consumer |
| Availability | Both services harus online | Consumer boleh offline |
| Scaling | Scale bersama | Scale independent |
| Latency | End-to-end latency | Producer latency sangat rendah |
| Error handling | Caller langsung dapat error | Retry + dead letter queue |
| Throughput | Bottleneck di service terlemah | Buffered β burst tolerance |
| Use case | CRUD, query real-time | Event processing, email sending, log processing |
Gunakan MQ saat: operasi tidak perlu selesai secara synchronous (kirim email, generate report), ada traffic burst yang perlu di-buffer, Anda butuh decoupling antar service, atau perlu broadcast event ke banyak consumer. Jangan gunakan MQ jika: user menunggu response langsung, atau flow sangat sederhana tanpa kebutuhan async.
2. RabbitMQ vs Kafka
Dua message broker paling populer β namun filosofi dan arsitekturnya sangat berbeda.
| Aspek | RabbitMQ | Apache Kafka |
|---|---|---|
| Tipe | Traditional message broker | Distributed event streaming platform |
| Model | Queue-based (push) | Log-based (pull) |
| Message retention | Hingga ACK (lalu dihapus) | Konfigurable (hari-bulan-tahun) |
| Message ordering | Per-queue (global ordering sulit) | Per-partition (garanteed) |
| Throughput | ~50K msg/detik | ~1M+ msg/detik |
| Protocol | AMQP, MQTT, STOMP | Custom binary protocol |
| Routing | Very flexible (direct, topic, fanout, headers) | Topic + partition key |
| Consumer model | Push-based (broker kirim ke consumer) | Pull-based (consumer tarik data) |
| Message replay | Tidak (sudah dihapus setelah ACK) | Ya (data tersimpan di log) |
| Complexity | Lebih sederhana setup | Lebih kompleks (ZooKeeper/KRaft) |
| Best for | Task queues, RPC, routing kompleks | Event streaming, log aggregation, data pipelines |
| Managed services | CloudAMQP, Amazon MQ | Confluent Cloud, Amazon MSK |
- RabbitMQ = Kantor Pos: Kirim surat, diantarkan ke penerima, lalu dihapus dari sistem. Routing fleksibel: langsung, broadcast, berdasarkan topik.
- Kafka = Koran: Data dipublish ke topik, subscriber bisa baca kapan saja, dan arsip tetap tersimpan. Cocok untuk streaming data real-time dan historical replay.
3. Publish/Subscribe Pattern
Dalam pattern Pub/Sub, publisher mengirim pesan ke topic/exchange, dan semua subscriber yang terdaftar akan menerima salinan pesan tersebut. Ini berbeda dari point-to-point (queue) di mana hanya satu consumer yang menerima.
RabbitMQ Pub/Sub (Fanout Exchange)
import pika
import json
# ===== PUBLISHER =====
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost')
)
channel = connection.channel()
# Deklarasi fanout exchange β broadcast ke semua queue yang ter-bind
channel.exchange_declare(
exchange='events',
exchange_type='fanout', # fanout = broadcast ke semua
durable=True # Survive broker restart
)
# Publish event
event = {
"type": "user.registered",
"data": {"user_id": 123, "name": "John"},
"timestamp": "2026-06-29T10:30:00Z"
}
channel.basic_publish(
exchange='events',
routing_key='', # Fanout: routing key di-ignore
body=json.dumps(event),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent
content_type='application/json'
)
)
# ===== SUBSCRIBER (Email Service) =====
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='fanout', durable=True)
# Buat exclusive queue (otomatis dihapus saat subscriber disconnect)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# Bind queue ke exchange
channel.queue_bind(exchange='events', queue=queue_name)
def callback(ch, method, properties, body):
event = json.loads(body)
if event['type'] == 'user.registered':
send_welcome_email(event['data']['name'])
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()
Kafka Pub/Sub (Consumer Group)
from kafka import KafkaProducer, KafkaConsumer
import json
# ===== KAFKA PRODUCER =====
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8') if k else None,
acks='all', # Wait for all replicas
retries=3,
enable_idempotence=True # Exactly-once producer
)
# Publish ke topic dengan partition key
event = {
"type": "order.created",
"order_id": "ORD-123",
"amount": 150000,
"user_id": "user-456"
}
future = producer.send(
topic='orders',
key='user-456', # Partition key: semua event user ini di partition yang sama
value=event
)
record_metadata = future.get(timeout=10)
print(f"Published to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")
# ===== KAFKA CONSUMER (Notification Service) =====
# Consumer group: SEMUA consumer dalam group yang SAMA
# akan berbagi partition (competing consumers)
# Consumer di group BERBEDA akan menerima SEMUA event (pub/sub)
consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
group_id='notification-service', # Consumer group ID
auto_offset_reset='earliest', # Mulai dari awal jika belum ada offset
enable_auto_commit=False,
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for message in consumer:
print(f"Partition: {message.partition}, Offset: {message.offset}")
print(f"Key: {message.key}, Value: {message.value}")
# Process message
send_notification(message.value)
# Manual commit setelah berhasil diproses
consumer.commit()
4. Competing Consumers
Pattern Competing Consumers memungkinkan beberapa consumer membaca dari queue yang sama, dengan setiap pesan hanya diproses oleh satu consumer. Ini memungkinkan horizontal scaling dari processing.
# Competing Consumers Pattern: # Producer mengirim 1000 messages ke queue # # ββββββββββββ ββββββββββββββββββββ # β Producer βββββββ Task Queue β # ββββββββββββ β (1000 messages) β # ββββββββββ¬ββββββββββ # βββββββββββββββββΌββββββββββββββββ # βΌ βΌ βΌ # ββββββββββββ ββββββββββββ ββββββββββββ # βConsumer 1β βConsumer 2β βConsumer 3β # β(334 msg) β β(333 msg) β β(333 msg) β # ββββββββββββ ββββββββββββ ββββββββββββ # # β Load balancing otomatis! # β Consumer yang lebih cepat dapat lebih banyak # β Consumer bisa ditambah/dikurangi sesuai load # Di RabbitMQ: basic_qos(prefetch_count=N) # β Setiap consumer hanya ambil N message sekaligus # β Mencegah consumer overwhelmed # Di Kafka: consumer group dengan partition assignment # β Setiap partition hanya di-assign ke 1 consumer dalam group # β Consumer baru β rebalance β partition di-reassign
5. Dead Letter Queue (DLQ)
Dead Letter Queue adalah queue khusus untuk menampung pesan yang gagal diproses. Tanpa DLQ, pesan yang error bisa stuck di main queue dan di-retry terus-menerus (poison message), menghabiskan resource.
# Dead Letter Queue Flow: # 1. Message masuk ke main queue # 2. Consumer memproses β GAGAL (exception) # 3. Message di-reject (basic.nack) atau TTL expired # 4. Message dipindahkan ke Dead Letter Queue # 5. DLQ bisa dimonitor, di-reprocess, atau dianalisis # Alasan message masuk DLQ: # - Message di-reject (basic.nack/basic.reject) dengan requeue=false # - TTL (Time-To-Live) expired # - Queue mencapai max length # - Jumlah retry melebihi batas # ββββββββββββ ββββββββββββββββ βββββββββββββββββ # β Producer βββββββ Main Queue βββββββ Consumer β # ββββββββββββ β (retry β€3x) β β (proses msg) β # ββββββββ¬ββββββββ βββββββββ¬ββββββββ # β β # β (TTL/reject) β (gagal 3x) # βΌ βΌ # βββββββββββββββββββββββββββββββββ # β Dead Letter Queue (DLQ) β # ββββββββββββββββ¬βββββββββββββββββ # β # βΌ # ββββββββββββββββββββββββββββββββ # β DLQ Consumer / Monitor β # β - Log error ke Sentry β # β - Alert ops team β # β - Manual reprocess β # β - Fix code β requeue β # ββββββββββββββββββββββββββββββββ
DLQ Implementation di RabbitMQ
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 1. Deklarasi Dead Letter Exchange & Queue
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(queue='dead_letter_queue', exchange='dlx', routing_key='failed')
# 2. Deklarasi Main Queue dengan DLQ configuration
channel.queue_declare(
queue='order_processing',
durable=True,
arguments={
'x-dead-letter-exchange': 'dlx', # DLQ exchange
'x-dead-letter-routing-key': 'failed', # Routing key ke DLQ
'x-message-ttl': 300000, # TTL: 5 menit
'x-max-length': 10000, # Max queue length
'x-max-priority': 10 # Priority support
}
)
# 3. Consumer dengan retry logic
def process_order(ch, method, properties, body):
try:
order = json.loads(body)
# Simulasi processing yang bisa gagal
result = handle_order(order)
ch.basic_ack(delivery_tag=method.delivery_tag)
print(f"Order {order['id']} processed successfully")
except TemporaryError as e:
# Error sementara β requeue (akan dicoba lagi)
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=True
)
except PermanentError as e:
# Error permanen β kirim ke DLQ
ch.basic_nack(
delivery_tag=method.delivery_tag,
requeue=False # β Masuk DLQ karena requeue=False
)
print(f"Order sent to DLQ: {e}")
# 4. DLQ Monitor β proses pesan yang gagal
def handle_dead_letter(ch, method, properties, body):
failed_msg = json.loads(body)
headers = properties.headers or {}
# Cek berapa kali sudah di-retry
retry_count = headers.get('x-death', [{}])[0].get('count', 0)
print(f"DLQ Message: {failed_msg}")
print(f"Retry count: {retry_count}")
print(f"Original exchange: {headers.get('x-first-death-exchange')}")
# Simpan ke database untuk investigasi
save_to_error_log(failed_msg, retry_count)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='dead_letter_queue', on_message_callback=handle_dead_letter)
channel.start_consuming()
6. Exactly-Once Processing
Exactly-once processing memastikan setiap message diproses tepat sekali β tidak lebih (duplicate), tidak kurang (miss). Ini adalah tantangan terbesar dalam distributed messaging.
# Delivery Semantics: # AT-LEAST-ONCE (paling umum): # β Message DIJAMIN sampai, tapi bisa DUPLIKAT # β Consumer ACK setelah proses, tapi message bisa dikirim ulang # β Consumer HARUS idempotent (proses 2x = hasil sama) # # Producer βββ Queue βββ Consumer # (gagal tanpa ACK) # βββ Consumer (retry β DUPLIKAT mungkin!) # # AT-MOST-ONCE: # β Message dikirim sekali, jika hilang ya sudah # β Fire-and-forget, tidak ada retry # β Cocok untuk telemetry/logging yang bisa hilang # EXACTLY-ONCE (ideal tapi sulit): # β Setiap message diproses TEPAT sekali # β Memerlukan kombinasi mekanisme: # 1. Idempotent producer (mencegah duplikat saat publish) # 2. Transactional publish (atomic write) # 3. Consumer offset commit dalam transaction # Kafka Exactly-Once (0.11+): # Producer β enable.idempotence=true β idempotent producer # Consumer β read_committed isolation level # Transaction β producer.beginTransaction() β send + commit offset β commitTransaction()
Idempotent Consumer Pattern
import redis
import hashlib
r = redis.Redis(host='localhost', port=6379)
def idempotent_consumer(message):
"""Consumer yang aman dari duplicate processing"""
# Buat unique ID untuk message
message_id = message.get('id') or hashlib.md5(
json.dumps(message, sort_keys=True).encode()
).hexdigest()
# Cek apakah sudah diproses
if r.exists(f"processed:{message_id}"):
print(f"Message {message_id} already processed, skipping")
return True # Already processed
# Set lock untuk mencegah concurrent processing
lock_key = f"processing:{message_id}"
if not r.set(lock_key, "1", nx=True, ex=300):
print(f"Message {message_id} being processed by another worker")
return False # Another worker is processing
try:
# Process message
result = do_actual_processing(message)
# Tandai sebagai processed (TTL: 24 jam)
r.setex(f"processed:{message_id}", 86400, "1")
# Simpan hasil untuk audit
r.setex(f"result:{message_id}", 86400, json.dumps(result))
print(f"Message {message_id} processed successfully")
return True
except Exception as e:
# Hapus lock agar bisa di-retry
r.delete(lock_key)
raise
finally:
r.delete(lock_key)
7. Messaging Patterns Lainnya
Request-Reply Pattern
# Request-Reply over Message Queue: # # 1. Producer mengirim request ke queue dengan correlation_id # 2. Producer menyimpan reply_to queue name # 3. Consumer memproses dan mengirim response ke reply_to queue # 4. Producer menerima response dengan correlation_id yang sama # ββββββββββββ ββββββββββββ # β Producer ββββ [request queue] ββ Consumerβ # β β β β # β βββ [reply queue] ββββββββββββ # ββββββββββββ ββββββββββββ # Penggunaan: Microservice RPC over MQ # Keuntungan: Loose coupling, built-in retry # Kelemahan: Lebih lambat dari direct HTTP
Event Sourcing Pattern
# Event Sourcing β simpan SEMUA peristiwa, bukan hanya state akhir
# Traditional (state-based):
# Account: { id: 1, balance: 800 }
# β Hanya tahu saldo akhir, tidak tahu sejarah
# Event Sourcing:
# Events:
# 1. AccountCreated { id: 1, balance: 0 }
# 2. MoneyDeposited { id: 1, amount: 1000 }
# 3. MoneyWithdrawn { id: 1, amount: 200 }
# β Tahu PERSIS bagaimana saldo 800 tercapai
# β Bisa replay events untuk rebuild state
# β Bisa project ke different read models
# Kafka sangat cocok untuk event sourcing karena:
# - Log-based storage (events tersimpan selamanya)
# - Ordered per partition
# - Consumer bisa replay dari offset tertentu
# - Compact topic: simpan hanya event terakhir per key
8. Implementasi Lengkap
RabbitMQ Docker Setup
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
volumes:
- rabbitmq_data:/var/lib/rabbitmq
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LOG_RETENTION_HOURS: 168 # 7 hari
KAFKA_NUM_PARTITIONS: 3
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
volumes:
- kafka_data:/var/lib/kafka/data
volumes:
rabbitmq_data:
kafka_data: