Protokol

Message Queue Patterns

Panduan lengkap arsitektur messaging β€” RabbitMQ vs Kafka, pub/sub, competing consumers, dead letter queues, exactly-once processing, event sourcing, dan pola asynchronus communication untuk microservices

1. Mengapa Message Queue?

Message Queue adalah middleware yang memungkinkan aplikasi berkomunikasi secara asynchronous melalui pesan. Alih-alih service A langsung memanggil service B (synchronous), service A mengirim pesan ke queue, dan service B memprosesnya kapan pun siap.

Synchronous vs Asynchronous Communication

AspekSynchronous (HTTP/gRPC)Asynchronous (Message Queue)
CouplingTight β€” caller harus tunggu responseLoose β€” producer tidak tunggu consumer
AvailabilityBoth services harus onlineConsumer boleh offline
ScalingScale bersamaScale independent
LatencyEnd-to-end latencyProducer latency sangat rendah
Error handlingCaller langsung dapat errorRetry + dead letter queue
ThroughputBottleneck di service terlemahBuffered β€” burst tolerance
Use caseCRUD, query real-timeEvent processing, email sending, log processing
πŸ’‘ Kapan Menggunakan Message Queue?

Gunakan MQ saat: operasi tidak perlu selesai secara synchronous (kirim email, generate report), ada traffic burst yang perlu di-buffer, Anda butuh decoupling antar service, atau perlu broadcast event ke banyak consumer. Jangan gunakan MQ jika: user menunggu response langsung, atau flow sangat sederhana tanpa kebutuhan async.

2. RabbitMQ vs Kafka

Dua message broker paling populer β€” namun filosofi dan arsitekturnya sangat berbeda.

AspekRabbitMQApache Kafka
TipeTraditional message brokerDistributed event streaming platform
ModelQueue-based (push)Log-based (pull)
Message retentionHingga ACK (lalu dihapus)Konfigurable (hari-bulan-tahun)
Message orderingPer-queue (global ordering sulit)Per-partition (garanteed)
Throughput~50K msg/detik~1M+ msg/detik
ProtocolAMQP, MQTT, STOMPCustom binary protocol
RoutingVery flexible (direct, topic, fanout, headers)Topic + partition key
Consumer modelPush-based (broker kirim ke consumer)Pull-based (consumer tarik data)
Message replayTidak (sudah dihapus setelah ACK)Ya (data tersimpan di log)
ComplexityLebih sederhana setupLebih kompleks (ZooKeeper/KRaft)
Best forTask queues, RPC, routing kompleksEvent streaming, log aggregation, data pipelines
Managed servicesCloudAMQP, Amazon MQConfluent Cloud, Amazon MSK
πŸ“š Analogy Sederhana
  • RabbitMQ = Kantor Pos: Kirim surat, diantarkan ke penerima, lalu dihapus dari sistem. Routing fleksibel: langsung, broadcast, berdasarkan topik.
  • Kafka = Koran: Data dipublish ke topik, subscriber bisa baca kapan saja, dan arsip tetap tersimpan. Cocok untuk streaming data real-time dan historical replay.

3. Publish/Subscribe Pattern

Dalam pattern Pub/Sub, publisher mengirim pesan ke topic/exchange, dan semua subscriber yang terdaftar akan menerima salinan pesan tersebut. Ini berbeda dari point-to-point (queue) di mana hanya satu consumer yang menerima.

RabbitMQ Pub/Sub (Fanout Exchange)

Python
import pika
import json

# ===== PUBLISHER =====
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# Deklarasi fanout exchange β€” broadcast ke semua queue yang ter-bind
channel.exchange_declare(
    exchange='events',
    exchange_type='fanout',  # fanout = broadcast ke semua
    durable=True             # Survive broker restart
)

# Publish event
event = {
    "type": "user.registered",
    "data": {"user_id": 123, "name": "John"},
    "timestamp": "2026-06-29T10:30:00Z"
}

channel.basic_publish(
    exchange='events',
    routing_key='',  # Fanout: routing key di-ignore
    body=json.dumps(event),
    properties=pika.BasicProperties(
        delivery_mode=2,  # Persistent
        content_type='application/json'
    )
)

# ===== SUBSCRIBER (Email Service) =====
channel = connection.channel()
channel.exchange_declare(exchange='events', exchange_type='fanout', durable=True)

# Buat exclusive queue (otomatis dihapus saat subscriber disconnect)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue

# Bind queue ke exchange
channel.queue_bind(exchange='events', queue=queue_name)

def callback(ch, method, properties, body):
    event = json.loads(body)
    if event['type'] == 'user.registered':
        send_welcome_email(event['data']['name'])
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue=queue_name, on_message_callback=callback)
channel.start_consuming()

Kafka Pub/Sub (Consumer Group)

Python
from kafka import KafkaProducer, KafkaConsumer
import json

# ===== KAFKA PRODUCER =====
producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8') if k else None,
    acks='all',  # Wait for all replicas
    retries=3,
    enable_idempotence=True  # Exactly-once producer
)

# Publish ke topic dengan partition key
event = {
    "type": "order.created",
    "order_id": "ORD-123",
    "amount": 150000,
    "user_id": "user-456"
}

future = producer.send(
    topic='orders',
    key='user-456',  # Partition key: semua event user ini di partition yang sama
    value=event
)
record_metadata = future.get(timeout=10)
print(f"Published to {record_metadata.topic} partition {record_metadata.partition} offset {record_metadata.offset}")

# ===== KAFKA CONSUMER (Notification Service) =====
# Consumer group: SEMUA consumer dalam group yang SAMA
# akan berbagi partition (competing consumers)
# Consumer di group BERBEDA akan menerima SEMUA event (pub/sub)
consumer = KafkaConsumer(
    'orders',
    bootstrap_servers='localhost:9092',
    group_id='notification-service',  # Consumer group ID
    auto_offset_reset='earliest',     # Mulai dari awal jika belum ada offset
    enable_auto_commit=False,
    value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)

for message in consumer:
    print(f"Partition: {message.partition}, Offset: {message.offset}")
    print(f"Key: {message.key}, Value: {message.value}")

    # Process message
    send_notification(message.value)

    # Manual commit setelah berhasil diproses
    consumer.commit()

4. Competing Consumers

Pattern Competing Consumers memungkinkan beberapa consumer membaca dari queue yang sama, dengan setiap pesan hanya diproses oleh satu consumer. Ini memungkinkan horizontal scaling dari processing.

Text
# Competing Consumers Pattern:

# Producer mengirim 1000 messages ke queue
#
# β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
# β”‚ Producer │────→│  Task Queue      β”‚
# β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚  (1000 messages) β”‚
#                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
#           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
#           β–Ό               β–Ό               β–Ό
#    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
#    β”‚Consumer 1β”‚    β”‚Consumer 2β”‚    β”‚Consumer 3β”‚
#    β”‚(334 msg) β”‚    β”‚(333 msg) β”‚    β”‚(333 msg) β”‚
#    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
#
# β†’ Load balancing otomatis!
# β†’ Consumer yang lebih cepat dapat lebih banyak
# β†’ Consumer bisa ditambah/dikurangi sesuai load

# Di RabbitMQ: basic_qos(prefetch_count=N)
# β†’ Setiap consumer hanya ambil N message sekaligus
# β†’ Mencegah consumer overwhelmed

# Di Kafka: consumer group dengan partition assignment
# β†’ Setiap partition hanya di-assign ke 1 consumer dalam group
# β†’ Consumer baru β†’ rebalance β†’ partition di-reassign

5. Dead Letter Queue (DLQ)

Dead Letter Queue adalah queue khusus untuk menampung pesan yang gagal diproses. Tanpa DLQ, pesan yang error bisa stuck di main queue dan di-retry terus-menerus (poison message), menghabiskan resource.

Text
# Dead Letter Queue Flow:

# 1. Message masuk ke main queue
# 2. Consumer memproses β†’ GAGAL (exception)
# 3. Message di-reject (basic.nack) atau TTL expired
# 4. Message dipindahkan ke Dead Letter Queue
# 5. DLQ bisa dimonitor, di-reprocess, atau dianalisis

# Alasan message masuk DLQ:
# - Message di-reject (basic.nack/basic.reject) dengan requeue=false
# - TTL (Time-To-Live) expired
# - Queue mencapai max length
# - Jumlah retry melebihi batas

# β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
# β”‚ Producer │────→│ Main Queue   │────→│ Consumer      β”‚
# β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β”‚ (retry ≀3x)  β”‚     β”‚ (proses msg)  β”‚
#                  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
#                         β”‚                     β”‚
#                         β”‚ (TTL/reject)        β”‚ (gagal 3x)
#                         β–Ό                     β–Ό
#                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
#                  β”‚     Dead Letter Queue (DLQ)   β”‚
#                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
#                                 β”‚
#                                 β–Ό
#                  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
#                  β”‚  DLQ Consumer / Monitor      β”‚
#                  β”‚  - Log error ke Sentry       β”‚
#                  β”‚  - Alert ops team            β”‚
#                  β”‚  - Manual reprocess           β”‚
#                  β”‚  - Fix code β†’ requeue         β”‚
#                  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

DLQ Implementation di RabbitMQ

Python
import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

# 1. Deklarasi Dead Letter Exchange & Queue
channel.exchange_declare(exchange='dlx', exchange_type='direct', durable=True)
channel.queue_declare(queue='dead_letter_queue', durable=True)
channel.queue_bind(queue='dead_letter_queue', exchange='dlx', routing_key='failed')

# 2. Deklarasi Main Queue dengan DLQ configuration
channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-dead-letter-exchange': 'dlx',         # DLQ exchange
        'x-dead-letter-routing-key': 'failed',    # Routing key ke DLQ
        'x-message-ttl': 300000,                  # TTL: 5 menit
        'x-max-length': 10000,                    # Max queue length
        'x-max-priority': 10                      # Priority support
    }
)

# 3. Consumer dengan retry logic
def process_order(ch, method, properties, body):
    try:
        order = json.loads(body)
        # Simulasi processing yang bisa gagal
        result = handle_order(order)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Order {order['id']} processed successfully")

    except TemporaryError as e:
        # Error sementara β†’ requeue (akan dicoba lagi)
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=True
        )

    except PermanentError as e:
        # Error permanen β†’ kirim ke DLQ
        ch.basic_nack(
            delivery_tag=method.delivery_tag,
            requeue=False  # ← Masuk DLQ karena requeue=False
        )
        print(f"Order sent to DLQ: {e}")

# 4. DLQ Monitor β€” proses pesan yang gagal
def handle_dead_letter(ch, method, properties, body):
    failed_msg = json.loads(body)
    headers = properties.headers or {}

    # Cek berapa kali sudah di-retry
    retry_count = headers.get('x-death', [{}])[0].get('count', 0)

    print(f"DLQ Message: {failed_msg}")
    print(f"Retry count: {retry_count}")
    print(f"Original exchange: {headers.get('x-first-death-exchange')}")

    # Simpan ke database untuk investigasi
    save_to_error_log(failed_msg, retry_count)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(queue='dead_letter_queue', on_message_callback=handle_dead_letter)
channel.start_consuming()

6. Exactly-Once Processing

Exactly-once processing memastikan setiap message diproses tepat sekali β€” tidak lebih (duplicate), tidak kurang (miss). Ini adalah tantangan terbesar dalam distributed messaging.

Text
# Delivery Semantics:

# AT-LEAST-ONCE (paling umum):
# β†’ Message DIJAMIN sampai, tapi bisa DUPLIKAT
# β†’ Consumer ACK setelah proses, tapi message bisa dikirim ulang
# β†’ Consumer HARUS idempotent (proses 2x = hasil sama)
#
# Producer ──→ Queue ──→ Consumer
#                        (gagal tanpa ACK)
#                    ──→ Consumer (retry β†’ DUPLIKAT mungkin!)
#
# AT-MOST-ONCE:
# β†’ Message dikirim sekali, jika hilang ya sudah
# β†’ Fire-and-forget, tidak ada retry
# β†’ Cocok untuk telemetry/logging yang bisa hilang

# EXACTLY-ONCE (ideal tapi sulit):
# β†’ Setiap message diproses TEPAT sekali
# β†’ Memerlukan kombinasi mekanisme:
#   1. Idempotent producer (mencegah duplikat saat publish)
#   2. Transactional publish (atomic write)
#   3. Consumer offset commit dalam transaction

# Kafka Exactly-Once (0.11+):
# Producer β†’ enable.idempotence=true β†’ idempotent producer
# Consumer β†’ read_committed isolation level
# Transaction β†’ producer.beginTransaction() β†’ send + commit offset β†’ commitTransaction()

Idempotent Consumer Pattern

Python
import redis
import hashlib

r = redis.Redis(host='localhost', port=6379)

def idempotent_consumer(message):
    """Consumer yang aman dari duplicate processing"""

    # Buat unique ID untuk message
    message_id = message.get('id') or hashlib.md5(
        json.dumps(message, sort_keys=True).encode()
    ).hexdigest()

    # Cek apakah sudah diproses
    if r.exists(f"processed:{message_id}"):
        print(f"Message {message_id} already processed, skipping")
        return True  # Already processed

    # Set lock untuk mencegah concurrent processing
    lock_key = f"processing:{message_id}"
    if not r.set(lock_key, "1", nx=True, ex=300):
        print(f"Message {message_id} being processed by another worker")
        return False  # Another worker is processing

    try:
        # Process message
        result = do_actual_processing(message)

        # Tandai sebagai processed (TTL: 24 jam)
        r.setex(f"processed:{message_id}", 86400, "1")

        # Simpan hasil untuk audit
        r.setex(f"result:{message_id}", 86400, json.dumps(result))

        print(f"Message {message_id} processed successfully")
        return True

    except Exception as e:
        # Hapus lock agar bisa di-retry
        r.delete(lock_key)
        raise

    finally:
        r.delete(lock_key)

7. Messaging Patterns Lainnya

Request-Reply Pattern

Text
# Request-Reply over Message Queue:
#
# 1. Producer mengirim request ke queue dengan correlation_id
# 2. Producer menyimpan reply_to queue name
# 3. Consumer memproses dan mengirim response ke reply_to queue
# 4. Producer menerima response dengan correlation_id yang sama

# β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
# β”‚ Producer │──→ [request queue] β”‚β†’ Consumerβ”‚
# β”‚          β”‚                    β”‚          β”‚
# β”‚          │←─ [reply queue]   │←─────────│
# β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

# Penggunaan: Microservice RPC over MQ
# Keuntungan: Loose coupling, built-in retry
# Kelemahan: Lebih lambat dari direct HTTP

Event Sourcing Pattern

Text
# Event Sourcing β€” simpan SEMUA peristiwa, bukan hanya state akhir

# Traditional (state-based):
# Account: { id: 1, balance: 800 }
# β†’ Hanya tahu saldo akhir, tidak tahu sejarah

# Event Sourcing:
# Events:
# 1. AccountCreated { id: 1, balance: 0 }
# 2. MoneyDeposited { id: 1, amount: 1000 }
# 3. MoneyWithdrawn { id: 1, amount: 200 }
# β†’ Tahu PERSIS bagaimana saldo 800 tercapai
# β†’ Bisa replay events untuk rebuild state
# β†’ Bisa project ke different read models

# Kafka sangat cocok untuk event sourcing karena:
# - Log-based storage (events tersimpan selamanya)
# - Ordered per partition
# - Consumer bisa replay dari offset tertentu
# - Compact topic: simpan hanya event terakhir per key

8. Implementasi Lengkap

RabbitMQ Docker Setup

YAML
# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"     # AMQP
      - "15672:15672"   # Management UI
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LOG_RETENTION_HOURS: 168  # 7 hari
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
    volumes:
      - kafka_data:/var/lib/kafka/data

volumes:
  rabbitmq_data:
  kafka_data:

Quiz Pemahaman

Pertanyaan 1: Apa perbedaan utama RabbitMQ dan Kafka?

a) RabbitMQ queue-based (push), Kafka log-based (pull)
b) Sama saja
c) Kafka lebih lambat
d) RabbitMQ tidak mendukung pub/sub

Pertanyaan 2: Apa fungsi Dead Letter Queue?

a) Menyimpan pesan yang berhasil diproses
b) Menampung pesan yang gagal diproses
c) Mengenkripsi pesan
d) Mengompresi pesan

Pertanyaan 3: Delivery semantic apa yang paling umum digunakan?

a) At-most-once
b) At-least-once
c) Exactly-once
d) None

Pertanyaan 4: Mengapa Kafka cocok untuk event sourcing?

a) Karena Kafka menghapus message setelah ACK
b) Karena Kafka menyimpan event secara permanen di log
c) Karena Kafka tidak mendukung replay
d) Karena Kafka hanya untuk batch processing

Pertanyaan 5: Apa yang membuat consumer idempotent?

a) Memproses message lebih cepat
b) Memastikan memproses message berkali-kali tetap menghasilkan hasil yang sama
c) Hanya menerima 1 message
d) Tidak pernah mengalami error
πŸ” Zoom
100%
🎨 Tema