1. Apa yang Baru di MQTT v5?
MQTT v5.0 dirilis pada Maret 2019 oleh OASIS, membawa peningkatan signifikan dari versi 3.1.1. Versi ini menambahkan fitur-fitur enterprise yang sebelumnya tidak ada, membuat MQTT lebih cocok untuk deployment skala besar dengan kebutuhan manajemen yang lebih baik.
Perbandingan MQTT v3.1.1 vs v5.0
| Fitur | MQTT v3.1.1 | MQTT v5.0 |
|---|---|---|
| User Properties | Tidak ada | Ya β key-value pairs di setiap packet |
| Shared Subscriptions | Tidak standar | Ya β load balancing bawaan |
| Message Expiry | Tidak | Ya β TTL per message |
| Topic Aliases | Tidak | Ya β numeric alias untuk topic string |
| Reason Codes | Hanya CONNACK | Ya β di semua ACK packet |
| Request/Response | Tidak native | Ya β Response Topic + Correlation Data |
| Session Management | Clean Session flag | Clean Start + Session Expiry Interval |
| Flow Control | Tidak | Ya β Receive Maximum |
| Disconnect with reason | Tidak | Ya β DISCONNECT payload |
| Server-side disconnect | Tidak | Ya β server bisa disconnect client |
MQTT v5 tidak backward-compatible dengan v3.1.1. Client v5 tidak bisa langsung connect ke broker v3.1.1 dan sebaliknya. Namun, broker modern seperti Mosquitto, EMQX, dan HiveMQ mendukung kedua versi secara simultan. Saat connect, client mengirim versi di CONNECT packet dan broker merespons sesuai.
2. User Properties
User Properties adalah fitur paling fleksibel di MQTT v5. Fitur ini memungkinkan developer menambahkan metadata custom dalam bentuk key-value pairs pada hampir semua packet β mirip HTTP headers. User Properties berguna untuk tracing, routing, dan metadata aplikasi.
Packet yang Mendukung User Properties
| Packet Type | Support User Properties? | Penggunaan Umum |
|---|---|---|
| CONNECT | β Ya | Client info, version, metadata |
| CONNACK | β Ya | Server info, assigned client ID |
| PUBLISH | β Ya | Message metadata, tracing info |
| PUBACK/PUBREC/PUBREL/PUBCOMP | β Ya | ACK metadata |
| SUBSCRIBE | β Ya | Subscription metadata |
| SUBACK | β Ya | Server response metadata |
| DISCONNECT | β Ya | Disconnect reason details |
| AUTH | β Ya | Authentication data |
Contoh Penggunaan User Properties
import paho.mqtt.client as mqtt
import json
import uuid
from datetime import datetime
# ===== PUBLISH dengan User Properties =====
def publish_with_properties(client):
"""Publish message dengan user properties untuk tracing"""
# Payload: data sensor
payload = json.dumps({
"temperature": 25.3,
"humidity": 62.1,
"device": "sensor-001"
})
# User properties β metadata tambahan
user_properties = {
"trace-id": str(uuid.uuid4()),
"source": "edge-gateway-01",
"content-encoding": "utf-8",
"priority": "high",
"timestamp": datetime.utcnow().isoformat(),
"environment": "production"
}
# Publish dengan properties
properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
properties.UserProperty = [
(key, value) for key, value in user_properties.items()
]
client.publish(
topic="factory/sensors/temperature",
payload=payload,
qos=1,
properties=properties
)
# ===== SUBSCRIBE dan baca User Properties =====
def on_message(client, userdata, message):
"""Callback saat menerima message dengan properties"""
payload = json.loads(message.payload.decode())
# Akses user properties
if hasattr(message.properties, 'UserProperty'):
for key, value in message.properties.UserProperty:
print(f" {key}: {value}")
# Contoh: trace request
if key == "trace-id":
print(f" β Trace ID: {value}")
print(f"Topic: {message.topic}")
print(f"Payload: {payload}")
# Setup client MQTT v5
client = mqtt.Client(
protocol=mqtt.MQTTv5,
client_id="sensor-client-001"
)
# Connect dengan properties
connect_properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600 # 1 jam
connect_properties.UserProperty = [
("device-type", "gateway"),
("firmware-version", "2.1.0")
]
client.connect("broker.example.com", 1883, properties=connect_properties)
client.on_message = on_message
client.subscribe("factory/sensors/#")
client.loop_forever()
3. Shared Subscriptions
Shared Subscriptions memungkinkan beberapa client berlangganan ke satu topic dengan cara yang load-balanced. Pesan yang dikirim ke topic tersebut akan didistribusikan ke salah satu subscriber dalam grup, bukan ke semua subscriber. Ini sangat berguna untuk horizontal scaling consumer.
Syntax Shared Subscription
# Format Shared Subscription:
# $share/{share-name}/{topic-filter}
# Contoh:
# $share/analytics-group/factory/sensors/#
# $share/alert-handler/factory/alerts/#
# $share/data-processor/sensor/data/+
# Skema distribusi:
#
# ββββ Consumer A (member grup analytics)
# Publisher ββBrokerββ€
# ββββ Consumer B (member grup analytics)
#
# Broker akan memilih A atau B untuk setiap message.
# Bisa round-robin, random, atau sticky (tergantung implementasi broker).
#
# Tanpa shared subscription:
# ββββ Consumer A (dapat semua message)
# Publisher ββBrokerββ€
# ββββ Consumer B (dapat semua message juga)
# β DUPLIKASI! Dengan shared sub: hanya 1 yang dapat.
Implementasi Shared Subscriptions
import paho.mqtt.client as mqtt
import json
import os
import threading
# Worker ID unik
WORKER_ID = f"worker-{os.getpid()}"
def on_connect(client, userdata, flags, rc, properties=None):
print(f"[{WORKER_ID}] Connected with rc={rc}")
# Subscribe ke shared subscription
# Semua worker dengan share name "data-processor"
# akan menerima message secara bergantian
client.subscribe(
"$share/data-processor/factory/sensors/#",
qos=1
)
print(f"[{WORKER_ID}] Subscribed to shared group")
def on_message(client, userdata, message):
"""Setiap message hanya dikirim ke SATU worker dalam grup"""
payload = json.loads(message.payload.decode())
print(f"[{WORKER_ID}] Processing: {payload}")
# Simulasi processing
import time
time.sleep(0.1)
# Publish hasil processing
result = {
"worker": WORKER_ID,
"processed": True,
"original_topic": message.topic,
"result": analyze_data(payload)
}
client.publish(
"factory/processed/sensors",
json.dumps(result),
qos=1
)
def analyze_data(data):
"""Analisis data sensor"""
temp = data.get("temperature", 0)
if temp > 30:
return {"status": "warning", "message": "Suhu tinggi"}
return {"status": "normal"}
# Setup client MQTT v5
client = mqtt.Client(
protocol=mqtt.MQTTv5,
client_id=WORKER_ID
)
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.example.com", 1883)
client.loop_forever()
- EMQX: Mendukung penuh, round-robin default, bisa dikonfigurasi
- HiveMQ: Mendukung penuh, round-robin default
- Mosquitto: Mendukung dari versi 2.0+
- RabbitMQ (MQTT plugin): Mendukung, distribusi ke queue
- Amazon IoT Core: Mendukung penuh
- Azure IoT Hub: Mendukung untuk device-to-cloud
4. Message Expiry Interval
Message Expiry Interval memungkinkan publisher menentukan berapa lama pesan harus disimpan oleh broker sebelum dianggap kedaluwarsa. Ini sangat penting untuk data real-time yang tidak relevan setelah beberapa waktu β seperti data sensor yang hanya valid untuk beberapa detik.
Cara Kerja Message Expiry
# Message Expiry Interval dalam aksi: Publisher Broker Subscriber (offline) | | | |--- PUBLISH ------------->| | | topic: sensor/temp | | | payload: 25.3Β°C | (disimpan karena | | expiry: 300 detik | subscriber offline) | | | | | ... 60 detik berlalu ... | | | | | | (expiry sekarang = 240s) | | | (dikurangi setiap detik) | | | | | ... 300 detik berlalu ... | | | | | | (PESAN DIHAPUS!) | | | (subscriber tidak | | | akan pernah menerima) | # Perbedaan dengan QoS 1/2 retained message: # - Retained message: disimpan selamanya di broker # - Expiry: otomatis dihapus setelah TTL habis # - Kombinasi: retained + expiry = data selalu up-to-date # Jika subscriber online: # - Broker mengirim message langsung # - Expiry interval DIKURANGI waktu penyimpanan # - Subscriber menerima message dengan sisa expiry
Implementasi Message Expiry
import paho.mqtt.client as mqtt
import json
from datetime import datetime
def publish_sensor_data():
client = mqtt.Client(protocol=mqtt.MQTTv5, client_id="sensor-publisher")
client.connect("broker.example.com", 1883)
# Data sensor real-time β hanya relevan 60 detik
data = {
"temperature": 26.8,
"humidity": 58.3,
"timestamp": datetime.utcnow().isoformat()
}
# Setup message expiry
publish_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
publish_properties.MessageExpiryInterval = 60 # 60 detik
# Publish dengan expiry
info = client.publish(
topic="factory/sensors/temperature",
payload=json.dumps(data),
qos=1,
properties=publish_properties
)
info.wait_for_publish()
print(f"Message published, expires in 60 seconds")
# Data alert β relevan lebih lama (5 menit)
alert_data = {
"alert_type": "high_temperature",
"value": 45.2,
"threshold": 40.0,
"severity": "critical"
}
alert_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
alert_properties.MessageExpiryInterval = 300 # 5 menit
client.publish(
topic="factory/alerts/temperature",
payload=json.dumps(alert_data),
qos=2, # Exactly once untuk alert kritis
properties=alert_properties
)
# Data konfigurasi β tidak expire (0 = tidak pernah expire)
config_data = {
"sampling_rate": 10,
"threshold_high": 40.0,
"threshold_low": 10.0
}
config_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
config_properties.MessageExpiryInterval = 0 # Tidak pernah expire
config_properties.Retain = True
client.publish(
topic="factory/config/sensor-001",
payload=json.dumps(config_data),
qos=1,
properties=config_properties
)
client.disconnect()
publish_sensor_data()
5. Topic Aliases
Topic Aliases memungkinkan penggantian topic string yang panjang dengan integer 16-bit untuk menghemat bandwidth. Ini sangat bermanfaat untuk perangkat IoT yang mengirim data frekuensi tinggi ke topic yang sama β topic string hanya perlu dikirim sekali, seterusnya cukup gunakan alias numerik.
Cara Kerja Topic Alias
# Penghematan bandwidth dengan Topic Alias: # PUBLISH 1 (tanpa alias, topic string lengkap): # Topic: "factory/floor-3/production-line-2/station-5/sensors/temperature" # β 65 bytes hanya untuk topic! # PUBLISH 1 (dengan alias establishment): # Topic: "factory/floor-3/production-line-2/station-5/sensors/temperature" # Topic Alias: 1 # β Broker menyimpan mapping: alias 1 β topic di atas # PUBLISH 2+ (menggunakan alias saja): # Topic: "" (kosong!) # Topic Alias: 1 # β Broker mengganti "" dengan topic yang sudah di-map # β HEMAT: dari 65 bytes menjadi 0 bytes untuk topic! # Perhitungan penghematan: # Misal: 1 message per detik, topic 65 bytes # Tanpa alias: 65 bytes Γ 86400 detik = 5.6 MB/hari (hanya untuk topic!) # Dengan alias: 65 bytes (pertama) + 0 bytes Γ 83599 = 65 bytes/hari # Penghematan: 99.99% # Broker-side: Topic Alias Maximum ditentukan di CONNACK # Client-side: Client menentukan Topic Alias di PUBLISH
Implementasi Topic Alias
import paho.mqtt.client as mqtt
import json
import time
def publish_with_topic_alias():
"""Mengirim data sensor dengan topic alias untuk efisiensi bandwidth"""
client = mqtt.Client(protocol=mqtt.MQTTv5, client_id="alias-demo")
client.connect("broker.example.com", 1883)
topics = {
1: "factory/floor-3/line-2/station-5/sensors/temperature",
2: "factory/floor-3/line-2/station-5/sensors/humidity",
3: "factory/floor-3/line-2/station-5/sensors/pressure",
}
for i in range(100):
for alias, topic_name in topics.items():
# Publish dengan topic alias
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.TopicAlias = alias
# Pertama kali: sertakan topic string
# Setelahnya: topic bisa kosong, cukup alias
data = {"value": 25.0 + (i * 0.1), "seq": i}
effective_topic = topic_name if i == 0 else ""
client.publish(
topic=effective_topic,
payload=json.dumps(data),
qos=0,
properties=props
)
time.sleep(1) # 1 detik interval
client.disconnect()
publish_with_topic_alias()
6. Reason Codes
MQTT v3.1.1 hanya memiliki reason codes di CONNACK. MQTT v5 memperluas ini ke semua acknowledgment packets, memberikan informasi detail mengapa suatu operasi berhasil atau gagal. Ini sangat membantu dalam debugging dan error handling.
Reason Codes Lengkap
| Code | Nama | Packet | Deskripsi |
|---|---|---|---|
| 0x00 | Success | CONNACK, PUBACK, SUBACK, dll. | Operasi berhasil |
| 0x01 | Granted QoS 1 | CONNACK, SUBACK | QoS 1 granted |
| 0x02 | Granted QoS 2 | CONNACK, SUBACK | QoS 2 granted |
| 0x04 | Disconnect with Will | DISCONNECT | Disconnect, broker publish will message |
| 0x10 | No matching subscribers | PUBACK, PUBREC | Tidak ada subscriber cocok |
| 0x11 | No subscription existed | UNSUBACK | Subscription tidak ditemukan |
| 0x80 | Unspecified error | Semua ACK | Error umum |
| 0x81 | Malformed Packet | CONNACK, DISCONNECT | Packet format salah |
| 0x82 | Protocol Error | CONNACK, DISCONNECT | Protocol violation |
| 0x83 | Implementation specific error | CONNACK, SUBACK | Error spesifik implementasi |
| 0x84 | Unsupported Protocol Version | CONNACK | Versi protokol tidak didukung |
| 0x85 | Client Identifier not valid | CONNACK | Client ID tidak valid |
| 0x86 | Bad User Name or Password | CONNACK | Autentikasi gagal |
| 0x87 | Not authorized | Semua | Tidak punya izin |
| 0x8C | Topic Name invalid | PUBLISH error | Topic format tidak valid |
| 0x90 | Topic Name invalid | SUBACK | Topic filter tidak valid |
| 0x95 | Quota exceeded | Semua | Batas quota terlampaui |
| 0x9A | Payload format invalid | PUBACK, DISCONNECT | Format payload tidak sesuai |
7. Session & Connection Management
MQTT v5 mengganti Clean Session dengan Clean Start dan Session Expiry Interval, memberikan kontrol yang lebih granular atas session lifecycle.
Session Expiry Interval
# MQTT v5 Session Management: # Clean Start = true, Session Expiry = 0: # β Session baru, langsung expired saat disconnect # β Sama seperti Clean Session = true di v3.1.1 # Clean Start = false, Session Expiry = 0: # β Resume session lama, tapi expired saat disconnect # β Mirip Clean Session = false di v3.1.1 # Clean Start = false, Session Expiry = 3600: # β Resume session lama # β Session disimpan 1 jam setelah disconnect # β Bisa reconnect dalam 1 jam tanpa kehilangan queued messages # Clean Start = true, Session Expiry = 86400 (24 jam): # β Session baru dimulai, tapi session LAMA (jika ada) disimpan 24 jam # β Jika reconnect dalam 24 jam, session lama bisa di-resume # Session Expiry = 0xFFFFFFFF (max value): # β Session TIDAK PERNAH expired # β Hanya dihapus secara manual oleh broker # β Gunakan dengan hati-hati! # === Server-side DISCONNECT === # MQTT v5 memungkinkan SERVER mengirim DISCONNECT ke client # Contoh: server ingin maintenance, atau client melanggar policy # Client menerima DISCONNECT dengan reason code # Client bisa reconnect setelah interval tertentu
Request/Response Pattern
Fitur baru yang powerful di MQTT v5: native request/response pattern melalui Response Topic dan Correlation Data.
import paho.mqtt.client as mqtt
import json
import uuid
# ===== SERVICE (menunggu request, mengirim response) =====
def handle_request(client, userdata, message):
"""Menerima request dan mengirim response ke Response Topic"""
request = json.loads(message.payload.decode())
# Ambil response topic dan correlation data
response_topic = message.properties.ResponseTopic
correlation = message.properties.CorrelationData
# Process request
result = process_query(request)
# Kirim response
response_props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
response_props.CorrelationData = correlation
client.publish(
topic=response_topic,
payload=json.dumps(result),
qos=1,
properties=response_props
)
# ===== CLIENT (mengirim request, menunggu response) =====
def send_request(client):
"""Mengirim request dan subscribe ke response topic"""
correlation_id = str(uuid.uuid4()).encode()
# Subscribe ke unique response topic
response_topic = f"responses/{client._client_id.decode()}"
client.subscribe(response_topic, qos=1)
# Kirim request
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
props.ResponseTopic = response_topic
props.CorrelationData = correlation_id
props.MessageExpiryInterval = 30 # Response harus dalam 30 detik
client.publish(
topic="services/data-query",
payload=json.dumps({"query": "SELECT * FROM sensors WHERE temp > 30"}),
qos=1,
properties=props
)
8. Implementasi dengan Paho MQTT
Contoh lengkap implementasi MQTT v5 client menggunakan library Eclipse Paho untuk Python dengan semua fitur v5.
import paho.mqtt.client as mqtt
import json
import time
import ssl
import uuid
class MQTTv5Client:
"""MQTT v5 client lengkap dengan semua fitur"""
def __init__(self, broker, port, client_id=None):
self.broker = broker
self.port = port
self.client_id = client_id or f"client-{uuid.uuid4().hex[:8]}"
# Buat client MQTT v5
self.client = mqtt.Client(
protocol=mqtt.MQTTv5,
client_id=self.client_id
)
# Setup TLS
self.client.tls_set(
cert_reqs=ssl.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS
)
# Setup callbacks
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.on_disconnect = self._on_disconnect
# Topic alias mapping
self.topic_aliases = {}
self.next_alias = 1
def connect(self, username=None, password=None):
"""Connect ke broker dengan properties"""
if username:
self.client.username_pw_set(username, password)
# Connect properties
props = mqtt.Properties(mqtt.PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600 # 1 jam
props.ReceiveMaximum = 50 # Max inflight messages
props.MaximumPacketSize = 65536 # 64KB max
props.UserProperty = [
("client-version", "1.0.0"),
("platform", "python")
]
self.client.connect(
self.broker, self.port,
keepalive=60,
properties=props
)
def subscribe_shared(self, topic, group, qos=1):
"""Subscribe ke shared subscription"""
shared_topic = f"$share/{group}/{topic}"
props = mqtt.Properties(mqtt.PacketTypes.SUBSCRIBE)
props.UserProperty = [("subscription-type", "shared")]
self.client.subscribe(shared_topic, qos, properties=props)
def publish_with_alias(self, topic, payload, qos=1, expiry=None):
"""Publish dengan topic alias"""
props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
# Setup topic alias
if topic not in self.topic_aliases:
self.topic_aliases[topic] = self.next_alias
self.next_alias += 1
props.TopicAlias = self.topic_aliases[topic]
# Hanya kirim topic string pertama kali
actual_topic = topic if len([t for t in self.topic_aliases if t == topic]) == 1 else topic
if expiry:
props.MessageExpiryInterval = expiry
self.client.publish(
topic=actual_topic,
payload=json.dumps(payload),
qos=qos,
properties=props
)
def _on_connect(self, client, userdata, flags, rc, properties):
print(f"Connected: {rc}")
if properties:
if hasattr(properties, 'TopicAliasMaximum'):
print(f"Max Topic Alias: {properties.TopicAliasMaximum}")
if hasattr(properties, 'ServerKeepAlive'):
print(f"Server Keep Alive: {properties.ServerKeepAlive}s")
def _on_message(self, client, userdata, message):
payload = json.loads(message.payload.decode())
props = message.properties
print(f"\n{'='*40}")
print(f"Topic: {message.topic}")
print(f"QoS: {message.qos}")
print(f"Payload: {json.dumps(payload, indent=2)}")
if hasattr(props, 'MessageExpiryInterval') and props.MessageExpiryInterval is not None:
print(f"Sisa Expiry: {props.MessageExpiryInterval}s")
if hasattr(props, 'UserProperty') and props.UserProperty:
print(f"User Properties:")
for key, value in props.UserProperty:
print(f" {key}: {value}")
def _on_disconnect(self, client, userdata, rc, properties=None):
print(f"Disconnected: {rc}")
if properties and hasattr(properties, 'ReasonString'):
print(f"Reason: {properties.ReasonString}")
def loop_forever(self):
self.client.loop_forever()
# Usage
if __name__ == "__main__":
client = MQTTv5Client("broker.example.com", 8883)
client.connect("user", "password")
client.subscribe_shared("factory/sensors/#", "analytics")
client.loop_forever()