1. Pengenalan Redis Streams
Redis Streams adalah data structure yang diperkenalkan di Redis 5.0 yang menyediakan fitur append-only log — mirip dengan Apache Kafka tetapi dalam skala yang lebih kecil dan lebih sederhana. Streams memungkinkan Anda menyimpan, mengelola, dan mendistribusikan event secara real-time.
Berbeda dengan pub/sub Redis yang sifatnya fire-and-forget (pesan hilang jika tidak ada subscriber), Streams mengpersistenkan semua event dan mendukung consumer groups — multiple consumers yang bisa memproses event secara paralel dengan acknowledgement dan tracking.
ke stream
ordered events
load balancing
successful process
1.1 Keunggulan Redis Streams
- Persistent: Event tersimpan di Redis dan bisa di-replay
- Consumer Groups: Multiple consumers dengan load balancing otomatis
- Ordered: Event dijamin terurut berdasarkan timestamp
- Low Latency: In-memory, latensi sub-milidetik
- Acknowledgement: Event tidak hilang sampai di-ACK
- Trimming: Bisa membatasi panjang stream untuk menghemat memory
2. XADD — Menambahkan Events
# XADD: menambahkan event ke stream # Format: XADD stream_name ID field1 value1 field2 value2 ... # ID otomatis (* = Redis generate timestamp-sequence) XADD orders * customer_id 1234 product "Laptop" quantity 1 total 15000000 XADD orders * customer_id 5678 product "Mouse" quantity 2 total 250000 # ID dengan timestamp spesifik XADD orders 1688000000000-0 event_type "payment" order_id "ORD-001" amount 500000 # ID dengan max-length (auto-trim) XADD orders MAXLEN ~ 10000 * customer_id 9999 product "Keyboard" quantity 1 # MINID strategy (hapus events lebih lama dari ID tertentu) XADD orders MINID ~ 1688000000000 * event_type "shipped" order_id "ORD-001" # Hasil: Redis mengembalikan ID yang di-generate # "1688012345678-0" # Memeriksa stream XLEN orders # Jumlah entries XRANGE orders - + # Semua entries (dari awal ke akhir) XRANGE orders - + COUNT 5 # 5 entries pertama
2.1 Struktur ID Streams
Setiap entry di Redis Stream memiliki ID dalam format <millisecondsTimestamp>-<sequence>. Sequence di-increment jika ada multiple entries dalam milidetik yang sama, memastikan setiap ID unik.
import redis
import json
from datetime import datetime
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Menambahkan event ke stream
def publish_event(stream, event_type, data):
"""Publish event ke Redis Stream."""
entry = {
'event_type': event_type,
'timestamp': datetime.now().isoformat(),
'data': json.dumps(data)
}
event_id = r.xadd(stream, entry, maxlen=10000)
print(f"Published {event_type} to {stream}: {event_id}")
return event_id
# Contoh penggunaan
publish_event('orders', 'order_created', {
'order_id': 'ORD-001',
'customer_id': 1234,
'items': [{'product': 'Laptop', 'qty': 1, 'price': 15000000}],
'total': 15000000
})
publish_event('orders', 'payment_received', {
'order_id': 'ORD-001',
'payment_method': 'credit_card',
'amount': 15000000
})
publish_event('orders', 'order_shipped', {
'order_id': 'ORD-001',
'tracking_number': 'JNE-123456',
'carrier': 'JNE'
})
# Batch insert
for i in range(100):
r.xadd('events', {
'type': 'page_view',
'user_id': str(i % 10),
'page': f'/page/{i}'
})
3. XREAD — Membaca Events
# XREAD: membaca events dari stream # Baca semua events XREAD COUNT 10 STREAMS orders 0 # Baca events setelah ID tertentu XREAD COUNT 10 STREAMS orders 1688012345678-0 # BLOCKING read: tunggu jika tidak ada event baru # (berguna untuk consumer loop) XREAD BLOCK 5000 COUNT 1 STREAMS orders $ # $ = hanya events yang baru ditambahkan setelah command ini # 0 = semua events dari awal # ID spesifik = events setelah ID tersebut # Baca dari multiple streams sekaligus XREAD COUNT 5 STREAMS orders events 0 0 # Membaca events terakhir XREVRANGE orders + - COUNT 5
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Simple consumer (tanpa consumer group)
def consume_events_simple(stream, last_id='$'):
"""Consumer loop dengan blocking read."""
print(f"Listening on stream '{stream}'...")
while True:
# Block sampai ada event baru (timeout 5 detik)
result = r.xread({stream: last_id}, count=10, block=5000)
if result:
for stream_name, messages in result:
for message_id, data in messages:
print(f"[{message_id}] {data['event_type']}: {data['data']}")
last_id = message_id
# Jalankan consumer
# consume_events_simple('orders')
4. Consumer Groups
Consumer Groups adalah fitur paling powerful di Redis Streams. Dengan consumer group, beberapa consumers bisa memproses events dari stream yang sama secara paralel — Redis mendistribusikan events ke consumers yang berbeda (load balancing) dan memastikan setiap event diproses tepat sekali.
# Membuat consumer group # ID 0 = mulai dari awal stream XGROUP CREATE orders order-processors 0 # ID $ = mulai dari events baru saja (skip yang lama) XGROUP CREATE orders notifications-group $ # Membuat consumer group (jika belum ada) XGROUP CREATE orders analytics-group 0 MKSTREAM # Membaca sebagai anggota consumer group # XREADGROUP GROUP group_name consumer_name [COUNT n] [BLOCK ms] STREAMS stream > # > = ambil events BARU yang belum didistribusikan ke consumer lain XREADGROUP GROUP order-processors worker-1 COUNT 5 BLOCK 2000 STREAMS orders > # Membaca ulang pending entries (yang belum di-ACK) XREADGROUP GROUP order-processors worker-1 COUNT 5 STREAMS orders 0 # Mengkonfirmasi pemrosesan (ACK) XACK orders order-processors 1688012345678-0 # ACK multiple XACK orders order-processors 1688012345678-0 1688012345679-0 # Lihat info consumer group XINFO GROUPS orders XINFO CONSUMERS orders order-processors XINFO STREAM orders
import redis
import json
import threading
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM = 'order_events'
GROUP = 'order-processors'
# Setup: buat stream dan consumer group
try:
r.xgroup_create(STREAM, GROUP, id='0', mkstream=True)
except redis.exceptions.ResponseError as e:
if 'BUSYGROUP' not in str(e):
raise
def producer():
"""Mengirim events ke stream."""
import time
for i in range(20):
r.xadd(STREAM, {
'event_type': 'order_created',
'order_id': f'ORD-{i:04d}',
'amount': str((i + 1) * 100000)
}, maxlen=10000)
time.sleep(0.5)
def consumer(consumer_name):
"""Membaca dan memproses events dari consumer group."""
print(f"Consumer '{consumer_name}' started...")
while True:
try:
results = r.xreadgroup(
GROUPNAME=GROUP,
consumername=consumer_name,
streams={STREAM: '>'},
count=1,
block=5000
)
if not results:
continue
for stream_name, messages in results:
for message_id, data in messages:
print(f"[{consumer_name}] Processing {message_id}: "
f"{data['event_type']} - {data['order_id']}")
# Proses event
process_order(data)
# ACK setelah berhasil diproses
r.xack(STREAM, GROUP, message_id)
print(f"[{consumer_name}] ACK'd {message_id}")
except Exception as e:
print(f"[{consumer_name}] Error: {e}")
def process_order(data):
"""Simulasi pemrosesan order."""
import time
time.sleep(0.1) # Simulasi kerja
# Jalankan: 1 producer + 3 consumers
# threading.Thread(target=producer, daemon=True).start()
# threading.Thread(target=consumer, args=('worker-1',), daemon=True).start()
# threading.Thread(target=consumer, args=('worker-2',), daemon=True).start()
# threading.Thread(target=consumer, args=('worker-3',), daemon=True).start()
Redis Streams mendukung at-least-once delivery — event bisa dikirim ulang jika consumer crash sebelum XACK. Untuk exactly-once, implementasikan idempotency di consumer (misal: cek apakah order_id sudah diproses sebelumnya) menggunakan Redis SET atau database.
5. Pending Entries & XCLAIM
Setiap event yang dibaca oleh consumer (via XREADGROUP) tapi belum di-ACK akan masuk ke Pending Entries List (PEL). Ini memungkinkan recovery — jika consumer crash, event-nya bisa di-claim oleh consumer lain.
# XPENDING: melihat entries yang belum di-ACK XPENDING orders order-processors # Detail pending entries XPENDING orders order-processors - + 10 # Pending entries untuk consumer tertentu XPENDING orders order-processors - + 10 worker-1 # XCLAIM: mengambil alih pending entries dari consumer yang stalled # Claim entries yang idle > 60 detik (60000 ms) XCLAIM orders order-processors worker-2 60000 1688012345678-0 1688012345679-0 # XAUTOCLAIM: auto-claim entries yang stalled # Ambil entries yang idle > 30 detik XAUTOCLAIM orders order-processors worker-2 30000 0-0 COUNT 10 # XCLAIM with JUSTID (hanya mendapatkan ID, tanpa data) XCLAIM orders order-processors worker-2 60000 1688012345678-0 JUSTID # Menghapus entries dari stream XDEL orders 1688012345678-0 # Menghapus entries yang sudah di-ACK (dengan trimming) XTRIM orders MAXLEN ~ 5000 XTRIM orders MINID ~ 1688000000000-0
import redis
import time
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
STREAM = 'order_events'
GROUP = 'order-processors'
DLQ = 'dead_letter_queue' # Queue untuk events yang gagal diproses
def claim_and_process_stalled(consumer_name, idle_ms=60000, max_retries=3):
"""Claim stalled entries dan proses dengan retry limit."""
while True:
claimed = r.xautoclaim(
name=STREAM,
groupname=GROUP,
consumername=consumer_name,
min_idle_time=idle_ms,
start_id='0-0',
count=10
)
if not claimed[1]: # No entries claimed
break
for message_id, data in claimed[1]:
# Cek berapa kali entry ini sudah di-claim (delivery count)
pending_info = r.xpending_range(
STREAM, GROUP,
min=message_id, max=message_id, count=1
)
if pending_info and pending_info[0]['times_delivered'] >= max_retries:
# Sudah terlalu banyak retry → pindahkan ke DLQ
r.xadd(DLQ, {
'original_id': message_id,
'data': json.dumps(data),
'reason': 'max_retries_exceeded'
})
r.xack(STREAM, GROUP, message_id)
print(f"Moved {message_id} to DLQ after {max_retries} retries")
else:
# Coba proses lagi
try:
process_order(data)
r.xack(STREAM, GROUP, message_id)
print(f"Successfully claimed and processed {message_id}")
except Exception as e:
print(f"Failed to process {message_id}: {e}")
# Tidak ACK → akan di-claim lagi nanti
6. Stream Management
# XINFO STREAM: informasi lengkap tentang stream
XINFO STREAM orders
# Output:
# length: 1500
# radix-tree-keys: 1
# radix-tree-nodes: 2
# groups: 2
# last-generated-id: 1688012345678-0
# first-entry: 1688000000000-0 {event_type} {order_created} ...
# last-entry: 1688012345678-0 {event_type} {order_shipped} ...
# XINFO GROUPS: semua consumer groups
XINFO GROUPS orders
# XINFO CONSUMERS: consumers dalam group
XINFO CONSUMERS orders order-processors
# XLEN: jumlah entries
XLEN orders
# XTRIM: membatasi panjang stream
# MAXLEN: batas entries (persis)
XTRIM orders MAXLEN 10000
# MINID: hapus entries sebelum ID tertentu
XTRIM orders MINID 1688000000000-0
# ~ (approximate): lebih efisien, Redis trim saat waktu idle
XTRIM orders MAXLEN ~ 10000
# XRANGE: membaca range entries
XRANGE orders 1688000000000 1688010000000 # Filter berdasarkan timestamp
XRANGE orders - + COUNT 10 # 10 entries pertama
XREVRANGE orders + - COUNT 10 # 10 entries terakhir (reverse)
# Menghapus consumer dari group
XGROUP DELCONSUMER orders order-processors worker-1
# Menghapus consumer group
XGROUP DESTROY orders order-processors
7. Event Sourcing Patterns
7.1 CQRS (Command Query Responsibility Segregation)
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
# Command side: menulis events
class OrderService:
def __init__(self):
self.stream = 'order_events'
def create_order(self, order_data):
event = {
'event_type': 'order_created',
'order_id': order_data['order_id'],
'customer_id': str(order_data['customer_id']),
'data': json.dumps(order_data)
}
event_id = r.xadd(self.stream, event)
return event_id
def update_status(self, order_id, new_status):
event = {
'event_type': 'order_status_changed',
'order_id': order_id,
'new_status': new_status,
'timestamp': datetime.now().isoformat()
}
return r.xadd(self.stream, event)
def add_item(self, order_id, item):
event = {
'event_type': 'item_added',
'order_id': order_id,
'data': json.dumps(item)
}
return r.xadd(self.stream, event)
# Query side: membangun read model dari events
class OrderProjection:
def __init__(self):
self.orders = {} # Bisa juga Redis Hash atau database
def rebuild_from_events(self, stream='order_events'):
"""Rebuild state dari event log."""
last_id = '0'
while True:
events = r.xread({stream: last_id}, count=100)
if not events:
break
for _, messages in events:
for event_id, data in messages:
self.apply_event(data)
last_id = event_id
def apply_event(self, event):
if event['event_type'] == 'order_created':
order = json.loads(event['data'])
order['status'] = 'created'
order['items'] = []
self.orders[event['order_id']] = order
elif event['event_type'] == 'order_status_changed':
order_id = event['order_id']
if order_id in self.orders:
self.orders[order_id]['status'] = event['new_status']
elif event['event_type'] == 'item_added':
order_id = event['order_id']
if order_id in self.orders:
item = json.loads(event['data'])
self.orders[order_id]['items'].append(item)
def get_order(self, order_id):
return self.orders.get(order_id)
8. Best Practices & Monitoring
| Praktik | Penjelasan |
|---|---|
| Gunakan MAXLEN/MINID | Batasi ukuran stream untuk menghindari memory exhaustion |
| ACK segera setelah proses | Jangan tunda XACK — event tetap di PEL sampai ACK |
| Implementasi Dead Letter Queue | Pindahkan events yang gagal terus ke DLQ |
| Monitor PEL | Pantau pending entries — jika terlalu banyak, ada masalah |
| Gunakan XAUTOCLAIM | Auto-claim stalled entries secara berkala |
| Consumer Name yang Unik | Gunakan hostname+PID atau UUID untuk consumer name |
| Avoid Huge Entries | Jangan simpan data besar di stream — simpan di S3, taruh URL di stream |
| Persistence | Gunakan AOF persistence untuk durability (appendfsync everysec) |
9. Quiz: Uji Pemahamanmu!
Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut: