Python untuk IoT

Async Python: Pemrograman Asinkron untuk IoT

TOKEN

Pelajari cara membangun sistem IoT yang responsif dan efisien dengan pemrograman asinkron Python β€” asyncio, async/await, event loop, dan integrasi dengan MQTT serta database

1. Sync vs Async: Perbedaan Mendasar

Dalam dunia IoT, perangkat sering kali perlu melakukan banyak hal bersamaan: membaca sensor, mengirim data ke server, menerima perintah dari MQTT, menyimpan ke database, dan memantau koneksi jaringan. Jika semua ini dilakukan secara sinkron (berurutan), sistem akan menjadi lambat dan tidak responsif.

Pemrograman asinkron memungkinkan program kita "menunggu" operasi yang lambat (jaringan, I/O) tanpa memblokir eksekusi keseluruhan. Saat satu operasi menunggu, operasi lain bisa berjalan.

Diagram: Sinkron vs Asinkron
  SINKRON (Berurutan)              ASINKRON (Bersamaan)
  ───────────────────              ────────────────────
  | Baca Sensor |                  | Baca Sensor   |
  | Kirim HTTP  |                  | Kirim HTTP    |
  | Tunggu...   |                  | Tunggu...     |
  | Terima Resp  |                  | Baca Sensor 2 |
  | Baca Sensor 2|                 | Terima Resp   |
  | Simpan DB    |                  | Simpan DB     |
  | Tunggu...    |                  | Tunggu...     8
  | Terima Resp   |                 | Terima Resp   |
  ───────────────────              ────────────────────
  Total: ~8 detik                  Total: ~3 detik

Kapan Menggunakan Async?

Kondisi Rekomendasi
Banyak operasi I/O (HTTP, MQTT, database)βœ… Gunakan async
Perlu handle banyak sensor secara bersamaanβœ… Gunakan async
Server IoT perlu handle banyak clientβœ… Gunakan async
Operasi CPU-intensive (image processing, ML)⚠️ Gunakan multiprocessing
Program sederhana, satu tugas sekaligus❌ Cukup sinkron saja
Script one-shot (baca sekali, kirim, selesai)❌ Cukup sinkron saja
πŸ’‘ Tips

Async bukan "lebih cepat" dari sinkron dalam hal kecepatan eksekusi per-baris kode. Keunggulannya adalah efisiensi waktu tunggu β€” saat satu operasi menunggu I/O, operasi lain bisa mulai. Ini sangat krusial di IoT di mana banyak sensor dan koneksi jaringan berjalan bersamaan.

2. Asyncio Basics

asyncio adalah library bawaan Python (sejak Python 3.4) yang menyediakan infrastruktur untuk menulis kode asinkron. Library ini menjadi fondasi dari ekosistem async Python β€” hampir semua library async modern (aiohttp, aiomqtt, aiosqlite) dibangun di atas asyncio.

2.1 β€” Konsep Dasar

Konsep Penjelasan
async defMendefinisikan coroutine β€” fungsi yang bisa dijalankan secara asinkron
awaitMenunggu coroutine selesai tanpa memblokir event loop
Event LoopMesin eksekusi asyncio yang mengatur kapan tiap coroutine berjalan
TaskCoroutine yang sudah dijadwalkan di event loop
asyncio.gather()Menjalankan beberapa coroutine secara bersamaan
asyncio.sleep()Tidur asinkron β€” tidak memblokir event loop

2.2 β€” Coroutine Pertama

Python β€” async_basics.py
"""
Asyncio Basics β€” Coroutine Pertama
BeebaneLabs - https://beebanelabs.pages.dev
"""

import asyncio
import time


# Coroutine sederhana β€” membaca sensor dengan delay
async def baca_sensor(nama, delay):
    """Simulasi pembacaan sensor yang membutuhkan waktu."""
    print(f"[{nama}] Mulai membaca...")
    await asyncio.sleep(delay)  # TIDAK memblokir event loop
    nilai = 25.5  # Simulasi hasil baca
    print(f"[{nama}] Selesai: {nilai}Β°C (butuh {delay}s)")
    return nilai


# Menjalankan beberapa coroutine SEKALIGUS
async def main():
    print("=== Menjalankan 3 sensor secara bersamaan ===")
    start = time.time()

    # gather() menjalankan semua coroutine concurrently
    hasil = await asyncio.gather(
        baca_sensor("DHT22-1", 2),
        baca_sensor("DHT22-2", 3),
        baca_sensor("DHT22-3", 1),
    )

    elapsed = time.time() - start
    print(f"\nSemua selesai dalam {elapsed:.1f}s (bukan {2+3+1}s!)")
    print(f"Hasil: {hasil}")


# Menjalankan event loop
asyncio.run(main())
Output: === Menjalankan 3 sensor secara bersamaan === [DHT22-1] Mulai membaca... [DHT22-2] Mulai membaca... [DHT22-3] Mulai membaca... [DHT22-3] Selesai: 25.5Β°C (butuh 1s) [DHT22-1] Selesai: 25.5Β°C (butuh 2s) [DHT22-2] Selesai: 25.5Β°C (butuh 3s) Semua selesai dalam 3.0s (bukan 6s!) Hasil: [25.5, 25.5, 25.5]
⚠️ Peringatan

await hanya bisa digunakan di dalam async def. Jika Anda mencoba menggunakan await di fungsi biasa, Python akan mengeluarkan SyntaxError. Selain itu, jangan pernah menggunakan time.sleep() dalam kode async β€” gunakan selalu asyncio.sleep().

3. async / await dalam Praktik

Keyword async dan await adalah jantung dari pemrograman asinkron Python. Mari kita eksplorasi penggunaannya dalam berbagai skenario IoT.

3.1 β€” Async Context Manager

Python β€” async_context.py
"""
Async Context Manager & Timeout
BeebaneLabs - https://beebanelabs.pages.dev
"""

import asyncio


class AsyncSensorReader:
    """Async context manager untuk koneksi sensor."""

    def __init__(self, nama, port):
        self.nama = nama
        self.port = port
        self._connected = False

    async def __aenter__(self):
        """Dipanggil saat masuk ke 'async with'."""
        print(f"[{self.nama}] Menghubungkan ke port {self.port}...")
        await asyncio.sleep(0.5)  # Simulasi waktu koneksi
        self._connected = True
        print(f"[{self.nama}] Terhubung!")
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Dipanggil saat keluar dari 'async with'."""
        self._connected = False
        print(f"[{self.nama}] Koneksi ditutup")

    async def baca(self, tipe="suhu"):
        if not self._connected:
            raise RuntimeError("Sensor belum terhubung!")
        await asyncio.sleep(0.2)  # Simulasi waktu baca
        if tipe == "suhu":
            return 28.5
        elif tipe == "kelembaban":
            return 65.2
        return None


async def main():
    # Async context manager β€” otomatis connect & disconnect
    async with AsyncSensorReader("DHT22", "/dev/ttyUSB0") as sensor:
        suhu = await sensor.baca("suhu")
        kelembaban = await sensor.baca("kelembaban")
        print(f"Suhu: {suhu}Β°C, Kelembaban: {kelembaban}%")
    # Koneksi otomatis ditutup di sini

    # Timeout β€” batasi waktu tunggu
    print("\n=== Dengan Timeout ===")
    try:
        async with AsyncSensorReader("Sensor Lambat", "/dev/ttyUSB1") as sensor:
            # Timeout 1 detik β€” jika lebih, batalkan
            suhu = await asyncio.wait_for(sensor.baca("suhu"), timeout=1.0)
            print(f"Suhu: {suhu}Β°C")
    except asyncio.TimeoutError:
        print("ERROR: Sensor terlalu lambat β€” timeout!")


asyncio.run(main())

3.2 β€” Task dan Cancel

Python β€” async_tasks.py
"""
Async Tasks β€” Menjalankan & Membatalkan Task
BeebaneLabs - https://beebanelabs.pages.dev
"""

import asyncio


async def monitoring_loop(nama, interval):
    """Loop monitoring yang berjalan terus-menerus."""
    count = 0
    while True:
        count += 1
        print(f"[{nama}] Monitoring #{count}")
        await asyncio.sleep(interval)


async def tugas_terbatas(nama, durasi):
    """Tugas yang berjalan selama durasi tertentu."""
    print(f"[{nama}] Dimulai ({durasi}s)")
    await asyncio.sleep(durasi)
    print(f"[{nama}] Selesai!")
    return f"Hasil dari {nama}"


async def main():
    # Membuat task β€” langsung mulai berjalan
    task1 = asyncio.create_task(tugas_terbatas("Sensor A", 2))
    task2 = asyncio.create_task(tugas_terbatas("Sensor B", 3))
    task3 = asyncio.create_task(monitoring_loop("Heartbeat", 1))

    # Tunggu task1 dan task2 selesai
    hasil = await asyncio.gather(task1, task2)
    print(f"Hasil: {hasil}")

    # Batalkan monitoring loop (tidak akan selesai sendiri)
    task3.cancel()
    try:
        await task3
    except asyncio.CancelledError:
        print("[Heartbeat] Dibatalkan")


asyncio.run(main())
Output: [Sensor A] Dimulai (2s) [Sensor B] Dimulai (3s) [Heartbeat] Monitoring #1 [Heartbeat] Monitoring #2 [Sensor A] Selesai! [Heartbeat] Monitoring #3 [Sensor B] Selesai! Hasil: ['Hasil dari Sensor A', 'Hasil dari Sensor B'] [Heartbeat] Dibatalkan

4. Memahami Event Loop

Event loop adalah mesin di balik asyncio β€” ia mengatur kapan setiap coroutine berjalan, menunggu, dan dilanjutkan. Memahami event loop membantu Anda menulis kode async yang lebih efisien dan menghindari jebakan umum.

Diagram: Cara Kerja Event Loop
  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
  β”‚              EVENT LOOP                      β”‚
  β”‚                                              β”‚
  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
  β”‚   β”‚ Task A   │──── await I/O ───┐            β”‚
  β”‚   β”‚ (baca    β”‚                  β”‚            β”‚
  β”‚   β”‚  sensor) β”‚                  β–Ό            β”‚
  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜           β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”        β”‚
  β”‚                          β”‚ Waiting  β”‚        β”‚
  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”           β”‚ (I/O     β”‚        β”‚
  β”‚   β”‚ Task B   │◄──────────│  pending)β”‚        β”‚
  β”‚   β”‚ (kirim   β”‚  I/O done β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜        β”‚
  β”‚   β”‚  HTTP)   β”‚                               β”‚
  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                               β”‚
  β”‚        β”‚                                     β”‚
  β”‚        β–Ό                                     β”‚
  β”‚   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
  β”‚   β”‚ Task C   β”‚  ◄── Giliran berikutnya       β”‚
  β”‚   β”‚ (simpan  β”‚                               β”‚
  β”‚   β”‚  DB)     β”‚                               β”‚
  β”‚   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                               β”‚
  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Blocking vs Non-Blocking: Kesalahan Umum

Python β€” event_loop_trap.py
"""
Event Loop β€” Blocking vs Non-Blocking
BeebaneLabs - https://beebanelabs.pages.dev
"""

import asyncio
import time


# ❌ SALAH β€” menggunakan time.sleep() (blocking!)
async def salah():
    print("[SALAH] Mulai...")
    time.sleep(3)  # MEMBLOKIR event loop! Semua task terhambat
    print("[SALAH] Selesai")


# βœ… BENAR β€” menggunakan asyncio.sleep() (non-blocking)
async def benar():
    print("[BENAR] Mulai...")
    await asyncio.sleep(3)  # TIDAK memblokir β€” event loop bisa jalan
    print("[BENAR] Selesai")


# Demonstrasi dampak blocking
async def heartbeat():
    """Task yang harus berjalan setiap 1 detik."""
    for i in range(5):
        print(f"  [Heartbeat] detik {i+1}")
        await asyncio.sleep(1)


async def demo_salah():
    print("=== DEMO: DENGAN time.sleep() (SALAH) ===")
    start = time.time()
    await asyncio.gather(
        salah(),       # Ini memblokir!
        heartbeat(),   # Heartbeat terhambat
    )
    print(f"Total: {time.time() - start:.1f}s\n")


async def demo_benar():
    print("=== DEMO: DENGAN asyncio.sleep() (BENAR) ===")
    start = time.time()
    await asyncio.gather(
        benar(),       # Non-blocking
        heartbeat(),   # Heartbeat lancar
    )
    print(f"Total: {time.time() - start:.1f}s")


async def main():
    await demo_salah()
    await demo_benar()


asyncio.run(main())

Solusi untuk Operasi Blocking yang Tidak Bisa Dihindari

Kadang kita harus memanggil library yang blocking (misalnya library C). Gunakan asyncio.to_thread() untuk menjalankannya di thread terpisah:

Python β€” to_thread.py
import asyncio


def baca_sensor_blocking():
    """Operasi blocking dari library C/serial."""
    import time
    time.sleep(2)  # Simulasi blocking I/O
    return 28.5


async def main():
    # Jalankan di thread terpisah agar event loop tidak terblokir
    hasil = await asyncio.to_thread(baca_sensor_blocking)
    print(f"Hasil: {hasil}Β°C")


asyncio.run(main())

5. Async HTTP Requests

Dalam IoT, mengirim data ke server REST API adalah operasi yang sangat umum. Library aiohttp menyediakan HTTP client dan server asinkron yang sangat efisien.

Python β€” async_http.py
"""
Async HTTP Requests dengan aiohttp
BeebaneLabs - https://beebanelabs.pages.dev

Instalasi: pip install aiohttp
"""

import asyncio
import aiohttp
import json
import time


async def kirim_data_sensor(session, url, data):
    """Mengirim data sensor ke server via HTTP POST."""
    try:
        async with session.post(url, json=data) as response:
            status = response.status
            result = await response.text()
            print(f"  [HTTP POST] Status: {status}, Response: {result[:100]}")
            return {"status": status, "data": result}
    except aiohttp.ClientError as e:
        print(f"  [HTTP POST] Error: {e}")
        return None


async def ambil_konfigurasi(session, url):
    """Mengambil konfigurasi dari server via HTTP GET."""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                data = await response.json()
                print(f"  [HTTP GET] Config diterima")
                return data
    except aiohttp.ClientError as e:
        print(f"  [HTTP GET] Error: {e}")
    return None


async def kirim_batch_data(base_url, data_list):
    """Mengirim banyak data sensor sekaligus secara concurrent."""
    async with aiohttp.ClientSession() as session:
        tasks = []
        for data in data_list:
            task = kirim_data_sensor(session, f"{base_url}/api/sensor", data)
            tasks.append(task)

        # Kirim semua sekaligus
        hasil = await asyncio.gather(*tasks, return_exceptions=True)
        return hasil


async def main():
    base_url = "https://httpbin.org"  # Testing endpoint

    # Contoh data sensor
    sensors_data = [
        {"sensor": "suhu", "nilai": 28.5, "unit": "Β°C", "lokasi": "ruang-tamu"},
        {"sensor": "kelembaban", "nilai": 65.2, "unit": "%", "lokasi": "ruang-tamu"},
        {"sensor": "gas", "nilai": 450, "unit": "ppm", "lokasi": "dapur"},
    ]

    print("=== Mengirim 3 data sensor secara bersamaan ===")
    start = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [
            kirim_data_sensor(session, f"{base_url}/post", data)
            for data in sensors_data
        ]
        hasil = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print(f"\nSemua terkirim dalam {elapsed:.2f}s")
    print(f"Berhasil: {sum(1 for h in hasil if h)} dari {len(hasil)}")

    # Connection timeout & retry
    print("\n=== Dengan Timeout & Retry ===")
    timeout = aiohttp.ClientTimeout(total=5)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        for attempt in range(3):
            try:
                result = await kirim_data_sensor(
                    session, f"{base_url}/post",
                    {"sensor": "test", "attempt": attempt + 1}
                )
                if result:
                    break
            except asyncio.TimeoutError:
                print(f"  Timeout percobaan {attempt + 1}, retry...")
                await asyncio.sleep(2 ** attempt)  # Exponential backoff


asyncio.run(main())
πŸ’‘ Tips

Gunakan aiohttp.ClientSession() sebagai context manager atau buat satu session untuk seluruh lifecycle aplikasi. Membuat session baru untuk setiap request sangat tidak efisien karena harus membuka koneksi TCP baru setiap kali.

6. Async MQTT

MQTT adalah protokol utama di IoT, dan library aiomqtt (dibangun atas paho-mqtt) memungkinkan kita menggunakan MQTT secara asinkron β€” sangat penting untuk gateway IoT yang perlu menangani banyak topik dan perangkat secara bersamaan.

Python β€” async_mqtt.py
"""
Async MQTT dengan aiomqtt
BeebaneLabs - https://beebanelabs.pages.dev

Instalasi: pip install aiomqtt
"""

import asyncio
import json
import time
from aiomqtt import Client as MQTTClient


BROKER = "localhost"
PORT = 1883


async def subscriber(nama, topics):
    """Async MQTT subscriber β€” mendengarkan beberapa topik."""
    print(f"[{nama}] Menghubungkan ke broker...")
    async with MQTTClient(BROKER, PORT) as client:
        # Subscribe ke semua topik
        for topic in topics:
            await client.subscribe(topic)
            print(f"[{nama}] Subscribe: {topic}")

        # Loop terima pesan
        async for message in client.messages:
            payload = message.payload.decode()
            print(f"[{nama}] {message.topic} β†’ {payload}")

            # Proses berdasarkan topik
            topic_str = str(message.topic)
            if "suhu" in topic_str:
                data = json.loads(payload)
                if data.get("suhu", 0) > 35:
                    print(f"[{nama}] ⚠️ ALERT: Suhu tinggi! {data['suhu']}°C")


async def publisher(nama, topic, interval=3):
    """Async MQTT publisher β€” mengirim data periodik."""
    async with MQTTClient(BROKER, PORT) as client:
        print(f"[{nama}] Publisher aktif, mengirim setiap {interval}s")
        count = 0
        while True:
            count += 1
            payload = json.dumps({
                "suhu": 25 + (count % 10),
                "kelembaban": 50 + (count % 30),
                "timestamp": int(time.time()),
                "sequence": count
            })
            await client.publish(topic, payload, qos=1)
            print(f"[{nama}] Publish #{count}")
            await asyncio.sleep(interval)


async def main():
    """Menjalankan subscriber dan publisher secara bersamaan."""
    print("=== Async MQTT Demo ===")

    # Jalankan subscriber dan publisher concurrently
    await asyncio.gather(
        subscriber("Sub-1", ["home/sensor/#", "home/status/#"]),
        publisher("Pub-1", "home/sensor/suhu", interval=2),
    )


# Jalankan dengan: python async_mqtt.py
# Pastikan MQTT broker (Mosquitto) sudah berjalan
# asyncio.run(main())
print("Uncomment asyncio.run(main()) untuk menjalankan")
⚠️ Peringatan

Untuk menjalankan contoh async MQTT, Anda perlu menginstal aiomqtt (pip install aiomqtt) dan memiliki MQTT broker yang berjalan (misalnya Mosquitto). aiomqtt dibangun di atas paho-mqtt dan mendukung MQTT v3.1.1 dan v5.0.

7. Async Database Queries

Menyimpan data sensor ke database adalah kebutuhan umum di IoT. Library aiosqlite dan asyncpg memungkinkan operasi database tanpa memblokir event loop.

Python β€” async_database.py
"""
Async Database Operations
BeebaneLabs - https://beebanelabs.pages.dev

Instalasi: pip install aiosqlite
"""

import asyncio
import aiosqlite
import time
import json


async def setup_database(db_path):
    """Membuat tabel jika belum ada."""
    async with aiosqlite.connect(db_path) as db:
        await db.execute("""
            CREATE TABLE IF NOT EXISTS sensor_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                sensor TEXT NOT NULL,
                nilai REAL NOT NULL,
                satuan TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        await db.execute("""
            CREATE INDEX IF NOT EXISTS idx_sensor
            ON sensor_data(sensor)
        """)
        await db.commit()
        print("[DB] Tabel siap")


async def simpan_data(db_path, sensor, nilai, satuan=""):
    """Menyimpan satu data sensor."""
    async with aiosqlite.connect(db_path) as db:
        await db.execute(
            "INSERT INTO sensor_data (sensor, nilai, satuan) VALUES (?, ?, ?)",
            (sensor, nilai, satuan)
        )
        await db.commit()
        print(f"[DB] Disimpan: {sensor} = {nilai}{satuan}")


async def simpan_batch(db_path, data_list):
    """Menyimpan banyak data sekaligus (lebih efisien)."""
    async with aiosqlite.connect(db_path) as db:
        await db.executemany(
            "INSERT INTO sensor_data (sensor, nilai, satuan) VALUES (?, ?, ?)",
            [(d["sensor"], d["nilai"], d["satuan"]) for d in data_list]
        )
        await db.commit()
        print(f"[DB] Batch disimpan: {len(data_list)} data")


async def ambil_data_terakhir(db_path, sensor, limit=10):
    """Mengambil data terakhir dari sensor tertentu."""
    async with aiosqlite.connect(db_path) as db:
        db.row_factory = aiosqlite.Row
        async with db.execute(
            "SELECT * FROM sensor_data WHERE sensor = ? ORDER BY id DESC LIMIT ?",
            (sensor, limit)
        ) as cursor:
            rows = await cursor.fetchall()
            return [dict(row) for row in rows]


async def ambil_statistik(db_path, sensor):
    """Menghitung statistik dari data sensor."""
    async with aiosqlite.connect(db_path) as db:
        async with db.execute("""
            SELECT
                COUNT(*) as jumlah,
                MIN(nilai) as min_val,
                MAX(nilai) as max_val,
                AVG(nilai) as avg_val
            FROM sensor_data
            WHERE sensor = ?
        """, (sensor,)) as cursor:
            row = await cursor.fetchone()
            return {
                "jumlah": row[0],
                "min": row[1],
                "max": row[2],
                "avg": round(row[3], 2) if row[3] else None
            }


async def main():
    db_path = "iot_data.db"

    # Setup
    await setup_database(db_path)

    # Simpan data satu per satu
    await simpan_data(db_path, "suhu", 28.5, "Β°C")
    await simpan_data(db_path, "suhu", 29.1, "Β°C")

    # Simpan batch (lebih cepat untuk banyak data)
    batch_data = [
        {"sensor": "suhu", "nilai": 27.8, "satuan": "Β°C"},
        {"sensor": "suhu", "nilai": 28.2, "satuan": "Β°C"},
        {"sensor": "kelembaban", "nilai": 65.0, "satuan": "%"},
        {"sensor": "kelembaban", "nilai": 68.5, "satuan": "%"},
        {"sensor": "gas", "nilai": 420, "satuan": "ppm"},
    ]
    await simpan_batch(db_path, batch_data)

    # Ambil data terakhir
    print("\n=== Data Terakhir: suhu ===")
    data = await ambil_data_terakhir(db_path, "suhu", limit=5)
    for row in data:
        print(f"  {row['timestamp']}: {row['nilai']}{row['satuan']}")

    # Statistik
    print("\n=== Statistik: suhu ===")
    stats = await ambil_statistik(db_path, "suhu")
    print(f"  Jumlah: {stats['jumlah']}, Min: {stats['min']}, "
          f"Max: {stats['max']}, Avg: {stats['avg']}")


asyncio.run(main())

8. Contoh Nyata: Sistem IoT Async

Mari kita gabungkan semua konsep ke dalam satu sistem IoT yang utuh β€” membaca sensor, mengirim ke server, menyimpan ke database, dan memantau MQTT, semuanya berjalan bersamaan.

Python β€” iot_async_system.py
"""
Sistem IoT Async Lengkap
BeebaneLabs - https://beebanelabs.pages.dev
"""

import asyncio
import json
import time
import random
from datetime import datetime


# === SENSOR READER ===
class AsyncSensorHub:
    """Membaca beberapa sensor secara concurrent."""

    def __init__(self):
        self.sensors = {}

    def tambah_sensor(self, key, read_fn, interval=5):
        self.sensors[key] = {"read": read_fn, "interval": interval}

    async def baca_periodik(self, key, callback):
        """Loop baca sensor secara periodik."""
        sensor = self.sensors[key]
        while True:
            try:
                data = await sensor["read"]()
                await callback(key, data)
            except Exception as e:
                print(f"[{key}] Error: {e}")
            await asyncio.sleep(sensor["interval"])


# === SIMULASI SENSOR ===
async def baca_suhu():
    await asyncio.sleep(0.1)  # Simulasi delay hardware
    return {"suhu": round(random.uniform(22, 38), 1), "satuan": "Β°C"}

async def baca_kelembaban():
    await asyncio.sleep(0.15)
    return {"kelembaban": round(random.uniform(30, 90), 1), "satuan": "%"}

async def baca_gas():
    await asyncio.sleep(0.2)
    return {"gas_ppm": round(random.uniform(200, 1500), 0), "satuan": "ppm"}


# === DATA PROCESSOR ===
class DataProcessor:
    """Memproses dan mendistribusikan data sensor."""

    def __init__(self):
        self.buffer = []
        self.alerts = []

    async def proses(self, key, data):
        """Memproses data dari sensor."""
        timestamp = datetime.now().strftime("%H:%M:%S")
        entry = {"sensor": key, "data": data, "waktu": timestamp}
        self.buffer.append(entry)

        print(f"  [{timestamp}] {key}: {data}")

        # Cek threshold
        await self._cek_threshold(key, data)

    async def _cek_threshold(self, key, data):
        """Cek apakah data melewati threshold."""
        if key == "suhu" and data.get("suhu", 0) > 35:
            alert = f"🚨 {key}: {data['suhu']}°C > 35°C!"
            self.alerts.append(alert)
            print(f"  {alert}")
        elif key == "gas" and data.get("gas_ppm", 0) > 1000:
            alert = f"🚨 {key}: {data['gas_ppm']}ppm > 1000ppm!"
            self.alerts.append(alert)
            print(f"  {alert}")


# === DATA SENDER ===
class AsyncDataSender:
    """Mengirim data ke berbagai tujuan secara async."""

    def __init__(self):
        self.sent_count = 0

    async def kirim_ke_cloud(self, data):
        """Simulasi kirim ke cloud server."""
        await asyncio.sleep(0.3)  # Simulasi network latency
        self.sent_count += 1

    async def simpan_lokal(self, data):
        """Simulasi simpan ke local database."""
        await asyncio.sleep(0.05)


# === MAIN SYSTEM ===
async def main():
    print("=" * 50)
    print("  SISTEM IoT ASYNC β€” BeebaneLabs")
    print("=" * 50)

    # Setup komponen
    hub = AsyncSensorHub()
    processor = DataProcessor()
    sender = AsyncDataSender()

    # Tambah sensor
    hub.tambah_sensor("suhu", baca_suhu, interval=2)
    hub.tambah_sensor("kelembaban", baca_kelembaban, interval=3)
    hub.tambah_sensor("gas", baca_gas, interval=2.5)

    # Callback: proses + kirim
    async def on_sensor_data(key, data):
        await processor.proses(key, data)
        await sender.kirim_ke_cloud({"sensor": key, **data})

    # Jalankan semua sensor secara bersamaan
    tasks = [
        asyncio.create_task(hub.baca_periodik(key, on_sensor_data))
        for key in hub.sensors
    ]

    # Tambah task monitoring
    async def status_reporter():
        while True:
            await asyncio.sleep(10)
            print(f"\nπŸ“Š Status: {sender.sent_count} data terkirim, "
                  f"{len(processor.alerts)} alert\n")

    tasks.append(asyncio.create_task(status_reporter()))

    # Jalankan selama 15 detik
    print("\nπŸš€ Sistem berjalan...\n")
    await asyncio.sleep(15)

    # Cleanup
    for task in tasks:
        task.cancel()

    print(f"\n{'=' * 50}")
    print(f"  Selesai! Total: {sender.sent_count} data terkirim")
    print(f"  Alerts: {len(processor.alerts)}")
    print(f"{'=' * 50}")


asyncio.run(main())
Diagram: Arsitektur Sistem IoT Async
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                 EVENT LOOP (asyncio)                   β”‚
β”‚                                                        β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ Sensor Suhu β”‚  β”‚ Sensor Lemb β”‚  β”‚ Sensor Gas   β”‚  β”‚
β”‚  β”‚  (setiap 2s)β”‚  β”‚  (setiap 3s)β”‚  β”‚  (setiap 2.5sβ”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚         β”‚                β”‚                β”‚           β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜           β”‚
β”‚                  β–Ό                β–Ό                    β”‚
β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”            β”‚
β”‚         β”‚ DataProcessorβ”‚  β”‚ StatusReport β”‚            β”‚
β”‚         β”‚ (threshold)  β”‚  β”‚ (setiap 10s) β”‚            β”‚
β”‚         β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜            β”‚
β”‚                β”‚                                       β”‚
β”‚         β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”                               β”‚
β”‚         β–Ό              β–Ό                               β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                         β”‚
β”‚  β”‚ Kirim Cloud β”‚  β”‚ Simpan DBβ”‚                         β”‚
β”‚  β”‚  (async)    β”‚  β”‚  (async) β”‚                         β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                         β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

9. Quiz: Uji Pemahamanmu!

Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut untuk menguji pemahamanmu tentang async Python untuk IoT:

Pertanyaan 1: Apa yang terjadi jika menggunakan time.sleep() di dalam coroutine async?

a) Tidak ada efek, program tetap async
b) Event loop terblokir dan semua task terhambat
c) Python otomatis mengubahnya ke asyncio.sleep()
d) Menghasilkan SyntaxError

Pertanyaan 2: Fungsi apa yang digunakan untuk menjalankan beberapa coroutine secara bersamaan?

a) asyncio.run_all()
b) asyncio.gather()
c) asyncio.parallel()
d) asyncio.concurrent()

Pertanyaan 3: Library apa yang digunakan untuk HTTP requests asinkron di Python?

a) requests
b) urllib
c) aiohttp
d) httpx

Pertanyaan 4: Bagaimana cara menjalankan fungsi blocking (sinkron) di dalam async code tanpa memblokir event loop?

a) Menggunakan asyncio.sleep() sebelum fungsi blocking
b) Menggunakan asyncio.to_thread()
c) Menambah parameter async di depan def
d) Tidak bisa, harus rewrite semua ke async

Pertanyaan 5: Apa fungsi asyncio.create_task()?

a) Menjadwalkan coroutine untuk dijalankan di background
b) Membuat thread baru untuk menjalankan kode
c) Menghentikan event loop
d) Menghapus coroutine dari antrean eksekusi