Database

Redis Streams untuk Event Sourcing

Tutorial komprehensif Redis Streams — data structure untuk event sourcing dan messaging. Pelajari XADD, XREAD, consumer groups, pending entries, XCLAIM, dan pola arsitektur event-driven dengan Redis

1. Pengenalan Redis Streams

Redis Streams adalah data structure yang diperkenalkan di Redis 5.0 yang menyediakan fitur append-only log — mirip dengan Apache Kafka tetapi dalam skala yang lebih kecil dan lebih sederhana. Streams memungkinkan Anda menyimpan, mengelola, dan mendistribusikan event secara real-time.

Berbeda dengan pub/sub Redis yang sifatnya fire-and-forget (pesan hilang jika tidak ada subscriber), Streams mengpersistenkan semua event dan mendukung consumer groups — multiple consumers yang bisa memproses event secara paralel dengan acknowledgement dan tracking.

Redis Streams Architecture
📝
Producer
XADD events
ke stream
📋
Stream
Persistent log
ordered events
👥
Consumer Group
Parallel processing
load balancing
ACK & Process
XACK after
successful process

1.1 Keunggulan Redis Streams

2. XADD — Menambahkan Events

Redis CLI — XADD Commands
# XADD: menambahkan event ke stream
# Format: XADD stream_name ID field1 value1 field2 value2 ...

# ID otomatis (* = Redis generate timestamp-sequence)
XADD orders * customer_id 1234 product "Laptop" quantity 1 total 15000000
XADD orders * customer_id 5678 product "Mouse" quantity 2 total 250000

# ID dengan timestamp spesifik
XADD orders 1688000000000-0 event_type "payment" order_id "ORD-001" amount 500000

# ID dengan max-length (auto-trim)
XADD orders MAXLEN ~ 10000 * customer_id 9999 product "Keyboard" quantity 1

# MINID strategy (hapus events lebih lama dari ID tertentu)
XADD orders MINID ~ 1688000000000 * event_type "shipped" order_id "ORD-001"

# Hasil: Redis mengembalikan ID yang di-generate
# "1688012345678-0"

# Memeriksa stream
XLEN orders          # Jumlah entries
XRANGE orders - +    # Semua entries (dari awal ke akhir)
XRANGE orders - + COUNT 5  # 5 entries pertama

2.1 Struktur ID Streams

Setiap entry di Redis Stream memiliki ID dalam format <millisecondsTimestamp>-<sequence>. Sequence di-increment jika ada multiple entries dalam milidetik yang sama, memastikan setiap ID unik.

Python — Producer dengan redis-py
import redis
import json
from datetime import datetime

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Menambahkan event ke stream
def publish_event(stream, event_type, data):
    """Publish event ke Redis Stream."""
    entry = {
        'event_type': event_type,
        'timestamp': datetime.now().isoformat(),
        'data': json.dumps(data)
    }
    event_id = r.xadd(stream, entry, maxlen=10000)
    print(f"Published {event_type} to {stream}: {event_id}")
    return event_id

# Contoh penggunaan
publish_event('orders', 'order_created', {
    'order_id': 'ORD-001',
    'customer_id': 1234,
    'items': [{'product': 'Laptop', 'qty': 1, 'price': 15000000}],
    'total': 15000000
})

publish_event('orders', 'payment_received', {
    'order_id': 'ORD-001',
    'payment_method': 'credit_card',
    'amount': 15000000
})

publish_event('orders', 'order_shipped', {
    'order_id': 'ORD-001',
    'tracking_number': 'JNE-123456',
    'carrier': 'JNE'
})

# Batch insert
for i in range(100):
    r.xadd('events', {
        'type': 'page_view',
        'user_id': str(i % 10),
        'page': f'/page/{i}'
    })

3. XREAD — Membaca Events

Redis CLI — XREAD Commands
# XREAD: membaca events dari stream
# Baca semua events
XREAD COUNT 10 STREAMS orders 0

# Baca events setelah ID tertentu
XREAD COUNT 10 STREAMS orders 1688012345678-0

# BLOCKING read: tunggu jika tidak ada event baru
# (berguna untuk consumer loop)
XREAD BLOCK 5000 COUNT 1 STREAMS orders $

# $ = hanya events yang baru ditambahkan setelah command ini
# 0 = semua events dari awal
# ID spesifik = events setelah ID tersebut

# Baca dari multiple streams sekaligus
XREAD COUNT 5 STREAMS orders events 0 0

# Membaca events terakhir
XREVRANGE orders + - COUNT 5
Python — Consumer Blocking Read
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Simple consumer (tanpa consumer group)
def consume_events_simple(stream, last_id='$'):
    """Consumer loop dengan blocking read."""
    print(f"Listening on stream '{stream}'...")

    while True:
        # Block sampai ada event baru (timeout 5 detik)
        result = r.xread({stream: last_id}, count=10, block=5000)

        if result:
            for stream_name, messages in result:
                for message_id, data in messages:
                    print(f"[{message_id}] {data['event_type']}: {data['data']}")
                    last_id = message_id

# Jalankan consumer
# consume_events_simple('orders')

4. Consumer Groups

Consumer Groups adalah fitur paling powerful di Redis Streams. Dengan consumer group, beberapa consumers bisa memproses events dari stream yang sama secara paralel — Redis mendistribusikan events ke consumers yang berbeda (load balancing) dan memastikan setiap event diproses tepat sekali.

Redis CLI — Consumer Group Commands
# Membuat consumer group
# ID 0 = mulai dari awal stream
XGROUP CREATE orders order-processors 0

# ID $ = mulai dari events baru saja (skip yang lama)
XGROUP CREATE orders notifications-group $

# Membuat consumer group (jika belum ada)
XGROUP CREATE orders analytics-group 0 MKSTREAM

# Membaca sebagai anggota consumer group
# XREADGROUP GROUP group_name consumer_name [COUNT n] [BLOCK ms] STREAMS stream >
# > = ambil events BARU yang belum didistribusikan ke consumer lain
XREADGROUP GROUP order-processors worker-1 COUNT 5 BLOCK 2000 STREAMS orders >

# Membaca ulang pending entries (yang belum di-ACK)
XREADGROUP GROUP order-processors worker-1 COUNT 5 STREAMS orders 0

# Mengkonfirmasi pemrosesan (ACK)
XACK orders order-processors 1688012345678-0

# ACK multiple
XACK orders order-processors 1688012345678-0 1688012345679-0

# Lihat info consumer group
XINFO GROUPS orders
XINFO CONSUMERS orders order-processors
XINFO STREAM orders
Python — Producer-Consumer dengan Consumer Group
import redis
import json
import threading

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM = 'order_events'
GROUP = 'order-processors'

# Setup: buat stream dan consumer group
try:
    r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
    if 'BUSYGROUP' not in str(e):
        raise

def producer():
    """Mengirim events ke stream."""
    import time
    for i in range(20):
        r.xadd(STREAM, {
            'event_type': 'order_created',
            'order_id': f'ORD-{i:04d}',
            'amount': str((i + 1) * 100000)
        }, maxlen=10000)
        time.sleep(0.5)

def consumer(consumer_name):
    """Membaca dan memproses events dari consumer group."""
    print(f"Consumer '{consumer_name}' started...")

    while True:
        try:
            results = r.xreadgroup(
                GROUPNAME=GROUP,
                consumername=consumer_name,
                streams={STREAM: '>'},
                count=1,
                block=5000
            )

            if not results:
                continue

            for stream_name, messages in results:
                for message_id, data in messages:
                    print(f"[{consumer_name}] Processing {message_id}: "
                          f"{data['event_type']} - {data['order_id']}")

                    # Proses event
                    process_order(data)

                    # ACK setelah berhasil diproses
                    r.xack(STREAM, GROUP, message_id)
                    print(f"[{consumer_name}] ACK'd {message_id}")

        except Exception as e:
            print(f"[{consumer_name}] Error: {e}")

def process_order(data):
    """Simulasi pemrosesan order."""
    import time
    time.sleep(0.1)  # Simulasi kerja

# Jalankan: 1 producer + 3 consumers
# threading.Thread(target=producer, daemon=True).start()
# threading.Thread(target=consumer, args=('worker-1',), daemon=True).start()
# threading.Thread(target=consumer, args=('worker-2',), daemon=True).start()
# threading.Thread(target=consumer, args=('worker-3',), daemon=True).start()
💡 Exactly-Once Semantics

Redis Streams mendukung at-least-once delivery — event bisa dikirim ulang jika consumer crash sebelum XACK. Untuk exactly-once, implementasikan idempotency di consumer (misal: cek apakah order_id sudah diproses sebelumnya) menggunakan Redis SET atau database.

5. Pending Entries & XCLAIM

Setiap event yang dibaca oleh consumer (via XREADGROUP) tapi belum di-ACK akan masuk ke Pending Entries List (PEL). Ini memungkinkan recovery — jika consumer crash, event-nya bisa di-claim oleh consumer lain.

Redis CLI — Pending Entries Management
# XPENDING: melihat entries yang belum di-ACK
XPENDING orders order-processors

# Detail pending entries
XPENDING orders order-processors - + 10

# Pending entries untuk consumer tertentu
XPENDING orders order-processors - + 10 worker-1

# XCLAIM: mengambil alih pending entries dari consumer yang stalled
# Claim entries yang idle > 60 detik (60000 ms)
XCLAIM orders order-processors worker-2 60000 1688012345678-0 1688012345679-0

# XAUTOCLAIM: auto-claim entries yang stalled
# Ambil entries yang idle > 30 detik
XAUTOCLAIM orders order-processors worker-2 30000 0-0 COUNT 10

# XCLAIM with JUSTID (hanya mendapatkan ID, tanpa data)
XCLAIM orders order-processors worker-2 60000 1688012345678-0 JUSTID

# Menghapus entries dari stream
XDEL orders 1688012345678-0

# Menghapus entries yang sudah di-ACK (dengan trimming)
XTRIM orders MAXLEN ~ 5000
XTRIM orders MINID ~ 1688000000000-0
Python — Dead Letter Queue Pattern
import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

STREAM = 'order_events'
GROUP = 'order-processors'
DLQ = 'dead_letter_queue'  # Queue untuk events yang gagal diproses

def claim_and_process_stalled(consumer_name, idle_ms=60000, max_retries=3):
    """Claim stalled entries dan proses dengan retry limit."""
    while True:
        claimed = r.xautoclaim(
            name=STREAM,
            groupname=GROUP,
            consumername=consumer_name,
            min_idle_time=idle_ms,
            start_id='0-0',
            count=10
        )

        if not claimed[1]:  # No entries claimed
            break

        for message_id, data in claimed[1]:
            # Cek berapa kali entry ini sudah di-claim (delivery count)
            pending_info = r.xpending_range(
                STREAM, GROUP,
                min=message_id, max=message_id, count=1
            )

            if pending_info and pending_info[0]['times_delivered'] >= max_retries:
                # Sudah terlalu banyak retry → pindahkan ke DLQ
                r.xadd(DLQ, {
                    'original_id': message_id,
                    'data': json.dumps(data),
                    'reason': 'max_retries_exceeded'
                })
                r.xack(STREAM, GROUP, message_id)
                print(f"Moved {message_id} to DLQ after {max_retries} retries")
            else:
                # Coba proses lagi
                try:
                    process_order(data)
                    r.xack(STREAM, GROUP, message_id)
                    print(f"Successfully claimed and processed {message_id}")
                except Exception as e:
                    print(f"Failed to process {message_id}: {e}")
                    # Tidak ACK → akan di-claim lagi nanti

6. Stream Management

Redis CLI — Stream Info & Management
# XINFO STREAM: informasi lengkap tentang stream
XINFO STREAM orders

# Output:
# length: 1500
# radix-tree-keys: 1
# radix-tree-nodes: 2
# groups: 2
# last-generated-id: 1688012345678-0
# first-entry: 1688000000000-0 {event_type} {order_created} ...
# last-entry: 1688012345678-0 {event_type} {order_shipped} ...

# XINFO GROUPS: semua consumer groups
XINFO GROUPS orders

# XINFO CONSUMERS: consumers dalam group
XINFO CONSUMERS orders order-processors

# XLEN: jumlah entries
XLEN orders

# XTRIM: membatasi panjang stream
# MAXLEN: batas entries (persis)
XTRIM orders MAXLEN 10000
# MINID: hapus entries sebelum ID tertentu
XTRIM orders MINID 1688000000000-0

# ~ (approximate): lebih efisien, Redis trim saat waktu idle
XTRIM orders MAXLEN ~ 10000

# XRANGE: membaca range entries
XRANGE orders 1688000000000 1688010000000  # Filter berdasarkan timestamp
XRANGE orders - + COUNT 10                  # 10 entries pertama
XREVRANGE orders + - COUNT 10               # 10 entries terakhir (reverse)

# Menghapus consumer dari group
XGROUP DELCONSUMER orders order-processors worker-1

# Menghapus consumer group
XGROUP DESTROY orders order-processors

7. Event Sourcing Patterns

7.1 CQRS (Command Query Responsibility Segregation)

Python — CQRS dengan Redis Streams
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

# Command side: menulis events
class OrderService:
    def __init__(self):
        self.stream = 'order_events'

    def create_order(self, order_data):
        event = {
            'event_type': 'order_created',
            'order_id': order_data['order_id'],
            'customer_id': str(order_data['customer_id']),
            'data': json.dumps(order_data)
        }
        event_id = r.xadd(self.stream, event)
        return event_id

    def update_status(self, order_id, new_status):
        event = {
            'event_type': 'order_status_changed',
            'order_id': order_id,
            'new_status': new_status,
            'timestamp': datetime.now().isoformat()
        }
        return r.xadd(self.stream, event)

    def add_item(self, order_id, item):
        event = {
            'event_type': 'item_added',
            'order_id': order_id,
            'data': json.dumps(item)
        }
        return r.xadd(self.stream, event)

# Query side: membangun read model dari events
class OrderProjection:
    def __init__(self):
        self.orders = {}  # Bisa juga Redis Hash atau database

    def rebuild_from_events(self, stream='order_events'):
        """Rebuild state dari event log."""
        last_id = '0'
        while True:
            events = r.xread({stream: last_id}, count=100)
            if not events:
                break
            for _, messages in events:
                for event_id, data in messages:
                    self.apply_event(data)
                    last_id = event_id

    def apply_event(self, event):
        if event['event_type'] == 'order_created':
            order = json.loads(event['data'])
            order['status'] = 'created'
            order['items'] = []
            self.orders[event['order_id']] = order

        elif event['event_type'] == 'order_status_changed':
            order_id = event['order_id']
            if order_id in self.orders:
                self.orders[order_id]['status'] = event['new_status']

        elif event['event_type'] == 'item_added':
            order_id = event['order_id']
            if order_id in self.orders:
                item = json.loads(event['data'])
                self.orders[order_id]['items'].append(item)

    def get_order(self, order_id):
        return self.orders.get(order_id)

8. Best Practices & Monitoring

PraktikPenjelasan
Gunakan MAXLEN/MINIDBatasi ukuran stream untuk menghindari memory exhaustion
ACK segera setelah prosesJangan tunda XACK — event tetap di PEL sampai ACK
Implementasi Dead Letter QueuePindahkan events yang gagal terus ke DLQ
Monitor PELPantau pending entries — jika terlalu banyak, ada masalah
Gunakan XAUTOCLAIMAuto-claim stalled entries secara berkala
Consumer Name yang UnikGunakan hostname+PID atau UUID untuk consumer name
Avoid Huge EntriesJangan simpan data besar di stream — simpan di S3, taruh URL di stream
PersistenceGunakan AOF persistence untuk durability (appendfsync everysec)

9. Quiz: Uji Pemahamanmu!

Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut:

Pertanyaan 1: Apa perbedaan utama antara Redis Pub/Sub dan Redis Streams?

a) Tidak ada perbedaan
b) Streams lebih lambat
c) Streams mengpersistenkan events dan mendukung consumer groups
d) Pub/Sub mendukung consumer groups

Pertanyaan 2: Apa fungsi dari XACK?

a) Menambahkan event ke stream
b) Mengkonfirmasi bahwa event telah diproses
c) Membaca events dari stream
d) Menghapus stream

Pertanyaan 3: Apa itu Pending Entries List (PEL)?

a) Daftar events yang sudah diproses
b) Daftar events yang dibaca tapi belum di-ACK
c) Daftar consumer yang aktif
d) Daftar streams yang tersedia

Pertanyaan 4: Apa fungsi XCLAIM?

a) Menghapus events yang stalled
b) Mengambil alih pending entries dari consumer lain yang stalled
c) Membuat consumer group baru
d) Mengirim notifikasi ke consumer

Pertanyaan 5: Mengapa MAXLEN penting saat menggunakan XADD?

a) Untuk mempercepat write performance
b) Untuk menghindari stream tumbuh tak terbatas dan menghabiskan memory
c) Untuk mengenkripsi data
d) Untuk mengurutkan events
← Sebelumnya SQL Query Optimization Advanced Selanjutnya → Supabase Realtime Features
🔍 Zoom
100%
🎨 Tema