Protokol

MQTT v5: Fitur Terbaru

Panduan lengkap MQTT version 5.0 β€” user properties, shared subscriptions, message expiry interval, topic aliases, reason codes, dan semua peningkatan dari v3.1.1 untuk arsitektur IoT enterprise

1. Apa yang Baru di MQTT v5?

MQTT v5.0 dirilis pada Maret 2019 oleh OASIS, membawa peningkatan signifikan dari versi 3.1.1. Versi ini menambahkan fitur-fitur enterprise yang sebelumnya tidak ada, membuat MQTT lebih cocok untuk deployment skala besar dengan kebutuhan manajemen yang lebih baik.

Perbandingan MQTT v3.1.1 vs v5.0

Fitur MQTT v3.1.1 MQTT v5.0
User PropertiesTidak adaYa β€” key-value pairs di setiap packet
Shared SubscriptionsTidak standarYa β€” load balancing bawaan
Message ExpiryTidakYa β€” TTL per message
Topic AliasesTidakYa β€” numeric alias untuk topic string
Reason CodesHanya CONNACKYa β€” di semua ACK packet
Request/ResponseTidak nativeYa β€” Response Topic + Correlation Data
Session ManagementClean Session flagClean Start + Session Expiry Interval
Flow ControlTidakYa β€” Receive Maximum
Disconnect with reasonTidakYa β€” DISCONNECT payload
Server-side disconnectTidakYa β€” server bisa disconnect client
πŸ’‘ Kompatibilitas

MQTT v5 tidak backward-compatible dengan v3.1.1. Client v5 tidak bisa langsung connect ke broker v3.1.1 dan sebaliknya. Namun, broker modern seperti Mosquitto, EMQX, dan HiveMQ mendukung kedua versi secara simultan. Saat connect, client mengirim versi di CONNECT packet dan broker merespons sesuai.

2. User Properties

User Properties adalah fitur paling fleksibel di MQTT v5. Fitur ini memungkinkan developer menambahkan metadata custom dalam bentuk key-value pairs pada hampir semua packet β€” mirip HTTP headers. User Properties berguna untuk tracing, routing, dan metadata aplikasi.

Packet yang Mendukung User Properties

Packet Type Support User Properties? Penggunaan Umum
CONNECTβœ… YaClient info, version, metadata
CONNACKβœ… YaServer info, assigned client ID
PUBLISHβœ… YaMessage metadata, tracing info
PUBACK/PUBREC/PUBREL/PUBCOMPβœ… YaACK metadata
SUBSCRIBEβœ… YaSubscription metadata
SUBACKβœ… YaServer response metadata
DISCONNECTβœ… YaDisconnect reason details
AUTHβœ… YaAuthentication data

Contoh Penggunaan User Properties

Python
import paho.mqtt.client as mqtt
import json
import uuid
from datetime import datetime

# ===== PUBLISH dengan User Properties =====
def publish_with_properties(client):
    """Publish message dengan user properties untuk tracing"""

    # Payload: data sensor
    payload = json.dumps({
        "temperature": 25.3,
        "humidity": 62.1,
        "device": "sensor-001"
    })

    # User properties β€” metadata tambahan
    user_properties = {
        "trace-id": str(uuid.uuid4()),
        "source": "edge-gateway-01",
        "content-encoding": "utf-8",
        "priority": "high",
        "timestamp": datetime.utcnow().isoformat(),
        "environment": "production"
    }

    # Publish dengan properties
    properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    properties.UserProperty = [
        (key, value) for key, value in user_properties.items()
    ]

    client.publish(
        topic="factory/sensors/temperature",
        payload=payload,
        qos=1,
        properties=properties
    )

# ===== SUBSCRIBE dan baca User Properties =====
def on_message(client, userdata, message):
    """Callback saat menerima message dengan properties"""
    payload = json.loads(message.payload.decode())

    # Akses user properties
    if hasattr(message.properties, 'UserProperty'):
        for key, value in message.properties.UserProperty:
            print(f"  {key}: {value}")

            # Contoh: trace request
            if key == "trace-id":
                print(f"  β†’ Trace ID: {value}")

    print(f"Topic: {message.topic}")
    print(f"Payload: {payload}")

# Setup client MQTT v5
client = mqtt.Client(
    protocol=mqtt.MQTTv5,
    client_id="sensor-client-001"
)

# Connect dengan properties
connect_properties = mqtt.Properties(mqtt.PacketTypes.CONNECT)
connect_properties.SessionExpiryInterval = 3600  # 1 jam
connect_properties.UserProperty = [
    ("device-type", "gateway"),
    ("firmware-version", "2.1.0")
]

client.connect("broker.example.com", 1883, properties=connect_properties)
client.on_message = on_message
client.subscribe("factory/sensors/#")
client.loop_forever()

3. Shared Subscriptions

Shared Subscriptions memungkinkan beberapa client berlangganan ke satu topic dengan cara yang load-balanced. Pesan yang dikirim ke topic tersebut akan didistribusikan ke salah satu subscriber dalam grup, bukan ke semua subscriber. Ini sangat berguna untuk horizontal scaling consumer.

Syntax Shared Subscription

Text
# Format Shared Subscription:
# $share/{share-name}/{topic-filter}

# Contoh:
# $share/analytics-group/factory/sensors/#
# $share/alert-handler/factory/alerts/#
# $share/data-processor/sensor/data/+

# Skema distribusi:
#
#                    β”Œβ”€β”€β†’ Consumer A (member grup analytics)
# Publisher ──Broker──
#                    └──→ Consumer B (member grup analytics)
#
# Broker akan memilih A atau B untuk setiap message.
# Bisa round-robin, random, atau sticky (tergantung implementasi broker).
#
# Tanpa shared subscription:
#                    β”Œβ”€β”€β†’ Consumer A (dapat semua message)
# Publisher ──Broker──
#                    └──→ Consumer B (dapat semua message juga)
# β†’ DUPLIKASI! Dengan shared sub: hanya 1 yang dapat.

Implementasi Shared Subscriptions

Python
import paho.mqtt.client as mqtt
import json
import os
import threading

# Worker ID unik
WORKER_ID = f"worker-{os.getpid()}"

def on_connect(client, userdata, flags, rc, properties=None):
    print(f"[{WORKER_ID}] Connected with rc={rc}")

    # Subscribe ke shared subscription
    # Semua worker dengan share name "data-processor"
    # akan menerima message secara bergantian
    client.subscribe(
        "$share/data-processor/factory/sensors/#",
        qos=1
    )
    print(f"[{WORKER_ID}] Subscribed to shared group")

def on_message(client, userdata, message):
    """Setiap message hanya dikirim ke SATU worker dalam grup"""
    payload = json.loads(message.payload.decode())
    print(f"[{WORKER_ID}] Processing: {payload}")

    # Simulasi processing
    import time
    time.sleep(0.1)

    # Publish hasil processing
    result = {
        "worker": WORKER_ID,
        "processed": True,
        "original_topic": message.topic,
        "result": analyze_data(payload)
    }

    client.publish(
        "factory/processed/sensors",
        json.dumps(result),
        qos=1
    )

def analyze_data(data):
    """Analisis data sensor"""
    temp = data.get("temperature", 0)
    if temp > 30:
        return {"status": "warning", "message": "Suhu tinggi"}
    return {"status": "normal"}

# Setup client MQTT v5
client = mqtt.Client(
    protocol=mqtt.MQTTv5,
    client_id=WORKER_ID
)

client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.example.com", 1883)
client.loop_forever()
πŸ“š Shared Subscription di Berbagai Broker
  • EMQX: Mendukung penuh, round-robin default, bisa dikonfigurasi
  • HiveMQ: Mendukung penuh, round-robin default
  • Mosquitto: Mendukung dari versi 2.0+
  • RabbitMQ (MQTT plugin): Mendukung, distribusi ke queue
  • Amazon IoT Core: Mendukung penuh
  • Azure IoT Hub: Mendukung untuk device-to-cloud

4. Message Expiry Interval

Message Expiry Interval memungkinkan publisher menentukan berapa lama pesan harus disimpan oleh broker sebelum dianggap kedaluwarsa. Ini sangat penting untuk data real-time yang tidak relevan setelah beberapa waktu β€” seperti data sensor yang hanya valid untuk beberapa detik.

Cara Kerja Message Expiry

Text
# Message Expiry Interval dalam aksi:

Publisher                    Broker                    Subscriber (offline)
  |                           |                           |
  |--- PUBLISH ------------->|                           |
  |   topic: sensor/temp     |                           |
  |   payload: 25.3Β°C        |  (disimpan karena         |
  |   expiry: 300 detik      |   subscriber offline)     |
  |                           |                           |
  |     ... 60 detik berlalu ...                         |
  |                           |                           |
  |                           |  (expiry sekarang = 240s) |
  |                           |  (dikurangi setiap detik) |
  |                           |                           |
  |     ... 300 detik berlalu ...                        |
  |                           |                           |
  |                           |  (PESAN DIHAPUS!)         |
  |                           |  (subscriber tidak        |
  |                           |   akan pernah menerima)   |

# Perbedaan dengan QoS 1/2 retained message:
# - Retained message: disimpan selamanya di broker
# - Expiry: otomatis dihapus setelah TTL habis
# - Kombinasi: retained + expiry = data selalu up-to-date

# Jika subscriber online:
# - Broker mengirim message langsung
# - Expiry interval DIKURANGI waktu penyimpanan
# - Subscriber menerima message dengan sisa expiry

Implementasi Message Expiry

Python
import paho.mqtt.client as mqtt
import json
from datetime import datetime

def publish_sensor_data():
    client = mqtt.Client(protocol=mqtt.MQTTv5, client_id="sensor-publisher")
    client.connect("broker.example.com", 1883)

    # Data sensor real-time β€” hanya relevan 60 detik
    data = {
        "temperature": 26.8,
        "humidity": 58.3,
        "timestamp": datetime.utcnow().isoformat()
    }

    # Setup message expiry
    publish_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    publish_properties.MessageExpiryInterval = 60  # 60 detik

    # Publish dengan expiry
    info = client.publish(
        topic="factory/sensors/temperature",
        payload=json.dumps(data),
        qos=1,
        properties=publish_properties
    )
    info.wait_for_publish()
    print(f"Message published, expires in 60 seconds")

    # Data alert β€” relevan lebih lama (5 menit)
    alert_data = {
        "alert_type": "high_temperature",
        "value": 45.2,
        "threshold": 40.0,
        "severity": "critical"
    }

    alert_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    alert_properties.MessageExpiryInterval = 300  # 5 menit

    client.publish(
        topic="factory/alerts/temperature",
        payload=json.dumps(alert_data),
        qos=2,  # Exactly once untuk alert kritis
        properties=alert_properties
    )

    # Data konfigurasi β€” tidak expire (0 = tidak pernah expire)
    config_data = {
        "sampling_rate": 10,
        "threshold_high": 40.0,
        "threshold_low": 10.0
    }

    config_properties = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    config_properties.MessageExpiryInterval = 0  # Tidak pernah expire
    config_properties.Retain = True

    client.publish(
        topic="factory/config/sensor-001",
        payload=json.dumps(config_data),
        qos=1,
        properties=config_properties
    )

    client.disconnect()

publish_sensor_data()

5. Topic Aliases

Topic Aliases memungkinkan penggantian topic string yang panjang dengan integer 16-bit untuk menghemat bandwidth. Ini sangat bermanfaat untuk perangkat IoT yang mengirim data frekuensi tinggi ke topic yang sama β€” topic string hanya perlu dikirim sekali, seterusnya cukup gunakan alias numerik.

Cara Kerja Topic Alias

Text
# Penghematan bandwidth dengan Topic Alias:

# PUBLISH 1 (tanpa alias, topic string lengkap):
# Topic: "factory/floor-3/production-line-2/station-5/sensors/temperature"
# β†’ 65 bytes hanya untuk topic!

# PUBLISH 1 (dengan alias establishment):
# Topic: "factory/floor-3/production-line-2/station-5/sensors/temperature"
# Topic Alias: 1
# β†’ Broker menyimpan mapping: alias 1 β†’ topic di atas

# PUBLISH 2+ (menggunakan alias saja):
# Topic: ""  (kosong!)
# Topic Alias: 1
# β†’ Broker mengganti "" dengan topic yang sudah di-map
# β†’ HEMAT: dari 65 bytes menjadi 0 bytes untuk topic!

# Perhitungan penghematan:
# Misal: 1 message per detik, topic 65 bytes
# Tanpa alias: 65 bytes Γ— 86400 detik = 5.6 MB/hari (hanya untuk topic!)
# Dengan alias: 65 bytes (pertama) + 0 bytes Γ— 83599 = 65 bytes/hari
# Penghematan: 99.99%

# Broker-side: Topic Alias Maximum ditentukan di CONNACK
# Client-side: Client menentukan Topic Alias di PUBLISH

Implementasi Topic Alias

Python
import paho.mqtt.client as mqtt
import json
import time

def publish_with_topic_alias():
    """Mengirim data sensor dengan topic alias untuk efisiensi bandwidth"""

    client = mqtt.Client(protocol=mqtt.MQTTv5, client_id="alias-demo")
    client.connect("broker.example.com", 1883)

    topics = {
        1: "factory/floor-3/line-2/station-5/sensors/temperature",
        2: "factory/floor-3/line-2/station-5/sensors/humidity",
        3: "factory/floor-3/line-2/station-5/sensors/pressure",
    }

    for i in range(100):
        for alias, topic_name in topics.items():
            # Publish dengan topic alias
            props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
            props.TopicAlias = alias

            # Pertama kali: sertakan topic string
            # Setelahnya: topic bisa kosong, cukup alias
            data = {"value": 25.0 + (i * 0.1), "seq": i}

            effective_topic = topic_name if i == 0 else ""

            client.publish(
                topic=effective_topic,
                payload=json.dumps(data),
                qos=0,
                properties=props
            )

        time.sleep(1)  # 1 detik interval

    client.disconnect()

publish_with_topic_alias()

6. Reason Codes

MQTT v3.1.1 hanya memiliki reason codes di CONNACK. MQTT v5 memperluas ini ke semua acknowledgment packets, memberikan informasi detail mengapa suatu operasi berhasil atau gagal. Ini sangat membantu dalam debugging dan error handling.

Reason Codes Lengkap

Code Nama Packet Deskripsi
0x00SuccessCONNACK, PUBACK, SUBACK, dll.Operasi berhasil
0x01Granted QoS 1CONNACK, SUBACKQoS 1 granted
0x02Granted QoS 2CONNACK, SUBACKQoS 2 granted
0x04Disconnect with WillDISCONNECTDisconnect, broker publish will message
0x10No matching subscribersPUBACK, PUBRECTidak ada subscriber cocok
0x11No subscription existedUNSUBACKSubscription tidak ditemukan
0x80Unspecified errorSemua ACKError umum
0x81Malformed PacketCONNACK, DISCONNECTPacket format salah
0x82Protocol ErrorCONNACK, DISCONNECTProtocol violation
0x83Implementation specific errorCONNACK, SUBACKError spesifik implementasi
0x84Unsupported Protocol VersionCONNACKVersi protokol tidak didukung
0x85Client Identifier not validCONNACKClient ID tidak valid
0x86Bad User Name or PasswordCONNACKAutentikasi gagal
0x87Not authorizedSemuaTidak punya izin
0x8CTopic Name invalidPUBLISH errorTopic format tidak valid
0x90Topic Name invalidSUBACKTopic filter tidak valid
0x95Quota exceededSemuaBatas quota terlampaui
0x9APayload format invalidPUBACK, DISCONNECTFormat payload tidak sesuai

7. Session & Connection Management

MQTT v5 mengganti Clean Session dengan Clean Start dan Session Expiry Interval, memberikan kontrol yang lebih granular atas session lifecycle.

Session Expiry Interval

Text
# MQTT v5 Session Management:

# Clean Start = true, Session Expiry = 0:
# β†’ Session baru, langsung expired saat disconnect
# β†’ Sama seperti Clean Session = true di v3.1.1

# Clean Start = false, Session Expiry = 0:
# β†’ Resume session lama, tapi expired saat disconnect
# β†’ Mirip Clean Session = false di v3.1.1

# Clean Start = false, Session Expiry = 3600:
# β†’ Resume session lama
# β†’ Session disimpan 1 jam setelah disconnect
# β†’ Bisa reconnect dalam 1 jam tanpa kehilangan queued messages

# Clean Start = true, Session Expiry = 86400 (24 jam):
# β†’ Session baru dimulai, tapi session LAMA (jika ada) disimpan 24 jam
# β†’ Jika reconnect dalam 24 jam, session lama bisa di-resume

# Session Expiry = 0xFFFFFFFF (max value):
# β†’ Session TIDAK PERNAH expired
# β†’ Hanya dihapus secara manual oleh broker
# β†’ Gunakan dengan hati-hati!

# === Server-side DISCONNECT ===
# MQTT v5 memungkinkan SERVER mengirim DISCONNECT ke client
# Contoh: server ingin maintenance, atau client melanggar policy
# Client menerima DISCONNECT dengan reason code
# Client bisa reconnect setelah interval tertentu

Request/Response Pattern

Fitur baru yang powerful di MQTT v5: native request/response pattern melalui Response Topic dan Correlation Data.

Python
import paho.mqtt.client as mqtt
import json
import uuid

# ===== SERVICE (menunggu request, mengirim response) =====
def handle_request(client, userdata, message):
    """Menerima request dan mengirim response ke Response Topic"""
    request = json.loads(message.payload.decode())

    # Ambil response topic dan correlation data
    response_topic = message.properties.ResponseTopic
    correlation = message.properties.CorrelationData

    # Process request
    result = process_query(request)

    # Kirim response
    response_props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    response_props.CorrelationData = correlation

    client.publish(
        topic=response_topic,
        payload=json.dumps(result),
        qos=1,
        properties=response_props
    )

# ===== CLIENT (mengirim request, menunggu response) =====
def send_request(client):
    """Mengirim request dan subscribe ke response topic"""
    correlation_id = str(uuid.uuid4()).encode()

    # Subscribe ke unique response topic
    response_topic = f"responses/{client._client_id.decode()}"
    client.subscribe(response_topic, qos=1)

    # Kirim request
    props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)
    props.ResponseTopic = response_topic
    props.CorrelationData = correlation_id
    props.MessageExpiryInterval = 30  # Response harus dalam 30 detik

    client.publish(
        topic="services/data-query",
        payload=json.dumps({"query": "SELECT * FROM sensors WHERE temp > 30"}),
        qos=1,
        properties=props
    )

8. Implementasi dengan Paho MQTT

Contoh lengkap implementasi MQTT v5 client menggunakan library Eclipse Paho untuk Python dengan semua fitur v5.

Python
import paho.mqtt.client as mqtt
import json
import time
import ssl
import uuid

class MQTTv5Client:
    """MQTT v5 client lengkap dengan semua fitur"""

    def __init__(self, broker, port, client_id=None):
        self.broker = broker
        self.port = port
        self.client_id = client_id or f"client-{uuid.uuid4().hex[:8]}"

        # Buat client MQTT v5
        self.client = mqtt.Client(
            protocol=mqtt.MQTTv5,
            client_id=self.client_id
        )

        # Setup TLS
        self.client.tls_set(
            cert_reqs=ssl.CERT_REQUIRED,
            tls_version=ssl.PROTOCOL_TLS
        )

        # Setup callbacks
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        self.client.on_disconnect = self._on_disconnect

        # Topic alias mapping
        self.topic_aliases = {}
        self.next_alias = 1

    def connect(self, username=None, password=None):
        """Connect ke broker dengan properties"""
        if username:
            self.client.username_pw_set(username, password)

        # Connect properties
        props = mqtt.Properties(mqtt.PacketTypes.CONNECT)
        props.SessionExpiryInterval = 3600  # 1 jam
        props.ReceiveMaximum = 50  # Max inflight messages
        props.MaximumPacketSize = 65536  # 64KB max
        props.UserProperty = [
            ("client-version", "1.0.0"),
            ("platform", "python")
        ]

        self.client.connect(
            self.broker, self.port,
            keepalive=60,
            properties=props
        )

    def subscribe_shared(self, topic, group, qos=1):
        """Subscribe ke shared subscription"""
        shared_topic = f"$share/{group}/{topic}"
        props = mqtt.Properties(mqtt.PacketTypes.SUBSCRIBE)
        props.UserProperty = [("subscription-type", "shared")]
        self.client.subscribe(shared_topic, qos, properties=props)

    def publish_with_alias(self, topic, payload, qos=1, expiry=None):
        """Publish dengan topic alias"""
        props = mqtt.Properties(mqtt.PacketTypes.PUBLISH)

        # Setup topic alias
        if topic not in self.topic_aliases:
            self.topic_aliases[topic] = self.next_alias
            self.next_alias += 1

        props.TopicAlias = self.topic_aliases[topic]

        # Hanya kirim topic string pertama kali
        actual_topic = topic if len([t for t in self.topic_aliases if t == topic]) == 1 else topic

        if expiry:
            props.MessageExpiryInterval = expiry

        self.client.publish(
            topic=actual_topic,
            payload=json.dumps(payload),
            qos=qos,
            properties=props
        )

    def _on_connect(self, client, userdata, flags, rc, properties):
        print(f"Connected: {rc}")
        if properties:
            if hasattr(properties, 'TopicAliasMaximum'):
                print(f"Max Topic Alias: {properties.TopicAliasMaximum}")
            if hasattr(properties, 'ServerKeepAlive'):
                print(f"Server Keep Alive: {properties.ServerKeepAlive}s")

    def _on_message(self, client, userdata, message):
        payload = json.loads(message.payload.decode())
        props = message.properties

        print(f"\n{'='*40}")
        print(f"Topic: {message.topic}")
        print(f"QoS: {message.qos}")
        print(f"Payload: {json.dumps(payload, indent=2)}")

        if hasattr(props, 'MessageExpiryInterval') and props.MessageExpiryInterval is not None:
            print(f"Sisa Expiry: {props.MessageExpiryInterval}s")
        if hasattr(props, 'UserProperty') and props.UserProperty:
            print(f"User Properties:")
            for key, value in props.UserProperty:
                print(f"  {key}: {value}")

    def _on_disconnect(self, client, userdata, rc, properties=None):
        print(f"Disconnected: {rc}")
        if properties and hasattr(properties, 'ReasonString'):
            print(f"Reason: {properties.ReasonString}")

    def loop_forever(self):
        self.client.loop_forever()

# Usage
if __name__ == "__main__":
    client = MQTTv5Client("broker.example.com", 8883)
    client.connect("user", "password")
    client.subscribe_shared("factory/sensors/#", "analytics")
    client.loop_forever()

Quiz Pemahaman

Pertanyaan 1: Fitur apa di MQTT v5 yang memungkinkan menambahkan metadata custom pada PUBLISH packet?

a) Topic Aliases
b) User Properties
c) Reason Codes
d) Shared Subscriptions

Pertanyaan 2: Format topic untuk Shared Subscription di MQTT v5 adalah?

a) shared/{group}/{topic}
b) $share/{group}/{topic}
c) {topic}@{group}
d) group:{group}/{topic}

Pertanyaan 3: Apa fungsi Message Expiry Interval?

a) Menentukan berapa lama client harus disconnect
b) Menentukan TTL pesan di broker sebelum dihapus
c) Menentukan interval keepalive
d) Menentukan retry interval

Pertanyaan 4: Apa yang menggantikan Clean Session di MQTT v5?

a) Persistent Session
b) Clean Start + Session Expiry Interval
c) Session Duration
d) Auto Reconnect

Pertanyaan 5: Apa keuntungan utama Topic Aliases?

a) Enkripsi topik
b) Menghemat bandwidth dengan menggantikan topic string dengan integer
c) Filter message berdasarkan priority
d) Mengatur akses ke topik
πŸ” Zoom
100%
🎨 Tema