1. Sync vs Async: Perbedaan Mendasar
Dalam dunia IoT, perangkat sering kali perlu melakukan banyak hal bersamaan: membaca sensor, mengirim data ke server, menerima perintah dari MQTT, menyimpan ke database, dan memantau koneksi jaringan. Jika semua ini dilakukan secara sinkron (berurutan), sistem akan menjadi lambat dan tidak responsif.
Pemrograman asinkron memungkinkan program kita "menunggu" operasi yang lambat (jaringan, I/O) tanpa memblokir eksekusi keseluruhan. Saat satu operasi menunggu, operasi lain bisa berjalan.
SINKRON (Berurutan) ASINKRON (Bersamaan) βββββββββββββββββββ ββββββββββββββββββββ | Baca Sensor | | Baca Sensor | | Kirim HTTP | | Kirim HTTP | | Tunggu... | | Tunggu... | | Terima Resp | | Baca Sensor 2 | | Baca Sensor 2| | Terima Resp | | Simpan DB | | Simpan DB | | Tunggu... | | Tunggu... 8 | Terima Resp | | Terima Resp | βββββββββββββββββββ ββββββββββββββββββββ Total: ~8 detik Total: ~3 detik
Kapan Menggunakan Async?
| Kondisi | Rekomendasi |
|---|---|
| Banyak operasi I/O (HTTP, MQTT, database) | β Gunakan async |
| Perlu handle banyak sensor secara bersamaan | β Gunakan async |
| Server IoT perlu handle banyak client | β Gunakan async |
| Operasi CPU-intensive (image processing, ML) | β οΈ Gunakan multiprocessing |
| Program sederhana, satu tugas sekaligus | β Cukup sinkron saja |
| Script one-shot (baca sekali, kirim, selesai) | β Cukup sinkron saja |
Async bukan "lebih cepat" dari sinkron dalam hal kecepatan eksekusi per-baris kode. Keunggulannya adalah efisiensi waktu tunggu β saat satu operasi menunggu I/O, operasi lain bisa mulai. Ini sangat krusial di IoT di mana banyak sensor dan koneksi jaringan berjalan bersamaan.
2. Asyncio Basics
asyncio adalah library bawaan Python (sejak Python 3.4) yang menyediakan infrastruktur untuk menulis kode asinkron. Library ini menjadi fondasi dari ekosistem async Python β hampir semua library async modern (aiohttp, aiomqtt, aiosqlite) dibangun di atas asyncio.
2.1 β Konsep Dasar
| Konsep | Penjelasan |
|---|---|
async def | Mendefinisikan coroutine β fungsi yang bisa dijalankan secara asinkron |
await | Menunggu coroutine selesai tanpa memblokir event loop |
| Event Loop | Mesin eksekusi asyncio yang mengatur kapan tiap coroutine berjalan |
| Task | Coroutine yang sudah dijadwalkan di event loop |
asyncio.gather() | Menjalankan beberapa coroutine secara bersamaan |
asyncio.sleep() | Tidur asinkron β tidak memblokir event loop |
2.2 β Coroutine Pertama
"""
Asyncio Basics β Coroutine Pertama
BeebaneLabs - https://beebanelabs.pages.dev
"""
import asyncio
import time
# Coroutine sederhana β membaca sensor dengan delay
async def baca_sensor(nama, delay):
"""Simulasi pembacaan sensor yang membutuhkan waktu."""
print(f"[{nama}] Mulai membaca...")
await asyncio.sleep(delay) # TIDAK memblokir event loop
nilai = 25.5 # Simulasi hasil baca
print(f"[{nama}] Selesai: {nilai}Β°C (butuh {delay}s)")
return nilai
# Menjalankan beberapa coroutine SEKALIGUS
async def main():
print("=== Menjalankan 3 sensor secara bersamaan ===")
start = time.time()
# gather() menjalankan semua coroutine concurrently
hasil = await asyncio.gather(
baca_sensor("DHT22-1", 2),
baca_sensor("DHT22-2", 3),
baca_sensor("DHT22-3", 1),
)
elapsed = time.time() - start
print(f"\nSemua selesai dalam {elapsed:.1f}s (bukan {2+3+1}s!)")
print(f"Hasil: {hasil}")
# Menjalankan event loop
asyncio.run(main())
await hanya bisa digunakan di dalam async def. Jika Anda mencoba menggunakan await di fungsi biasa, Python akan mengeluarkan SyntaxError. Selain itu, jangan pernah menggunakan time.sleep() dalam kode async β gunakan selalu asyncio.sleep().
3. async / await dalam Praktik
Keyword async dan await adalah jantung dari pemrograman asinkron Python. Mari kita eksplorasi penggunaannya dalam berbagai skenario IoT.
3.1 β Async Context Manager
"""
Async Context Manager & Timeout
BeebaneLabs - https://beebanelabs.pages.dev
"""
import asyncio
class AsyncSensorReader:
"""Async context manager untuk koneksi sensor."""
def __init__(self, nama, port):
self.nama = nama
self.port = port
self._connected = False
async def __aenter__(self):
"""Dipanggil saat masuk ke 'async with'."""
print(f"[{self.nama}] Menghubungkan ke port {self.port}...")
await asyncio.sleep(0.5) # Simulasi waktu koneksi
self._connected = True
print(f"[{self.nama}] Terhubung!")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Dipanggil saat keluar dari 'async with'."""
self._connected = False
print(f"[{self.nama}] Koneksi ditutup")
async def baca(self, tipe="suhu"):
if not self._connected:
raise RuntimeError("Sensor belum terhubung!")
await asyncio.sleep(0.2) # Simulasi waktu baca
if tipe == "suhu":
return 28.5
elif tipe == "kelembaban":
return 65.2
return None
async def main():
# Async context manager β otomatis connect & disconnect
async with AsyncSensorReader("DHT22", "/dev/ttyUSB0") as sensor:
suhu = await sensor.baca("suhu")
kelembaban = await sensor.baca("kelembaban")
print(f"Suhu: {suhu}Β°C, Kelembaban: {kelembaban}%")
# Koneksi otomatis ditutup di sini
# Timeout β batasi waktu tunggu
print("\n=== Dengan Timeout ===")
try:
async with AsyncSensorReader("Sensor Lambat", "/dev/ttyUSB1") as sensor:
# Timeout 1 detik β jika lebih, batalkan
suhu = await asyncio.wait_for(sensor.baca("suhu"), timeout=1.0)
print(f"Suhu: {suhu}Β°C")
except asyncio.TimeoutError:
print("ERROR: Sensor terlalu lambat β timeout!")
asyncio.run(main())
3.2 β Task dan Cancel
"""
Async Tasks β Menjalankan & Membatalkan Task
BeebaneLabs - https://beebanelabs.pages.dev
"""
import asyncio
async def monitoring_loop(nama, interval):
"""Loop monitoring yang berjalan terus-menerus."""
count = 0
while True:
count += 1
print(f"[{nama}] Monitoring #{count}")
await asyncio.sleep(interval)
async def tugas_terbatas(nama, durasi):
"""Tugas yang berjalan selama durasi tertentu."""
print(f"[{nama}] Dimulai ({durasi}s)")
await asyncio.sleep(durasi)
print(f"[{nama}] Selesai!")
return f"Hasil dari {nama}"
async def main():
# Membuat task β langsung mulai berjalan
task1 = asyncio.create_task(tugas_terbatas("Sensor A", 2))
task2 = asyncio.create_task(tugas_terbatas("Sensor B", 3))
task3 = asyncio.create_task(monitoring_loop("Heartbeat", 1))
# Tunggu task1 dan task2 selesai
hasil = await asyncio.gather(task1, task2)
print(f"Hasil: {hasil}")
# Batalkan monitoring loop (tidak akan selesai sendiri)
task3.cancel()
try:
await task3
except asyncio.CancelledError:
print("[Heartbeat] Dibatalkan")
asyncio.run(main())
4. Memahami Event Loop
Event loop adalah mesin di balik asyncio β ia mengatur kapan setiap coroutine berjalan, menunggu, dan dilanjutkan. Memahami event loop membantu Anda menulis kode async yang lebih efisien dan menghindari jebakan umum.
βββββββββββββββββββββββββββββββββββββββββββββββ β EVENT LOOP β β β β ββββββββββββ β β β Task A βββββ await I/O ββββ β β β (baca β β β β β sensor) β βΌ β β ββββββββββββ ββββββββββββ β β β Waiting β β β ββββββββββββ β (I/O β β β β Task B βββββββββββββ pending)β β β β (kirim β I/O done ββββββββββββ β β β HTTP) β β β ββββββββββββ β β β β β βΌ β β ββββββββββββ β β β Task C β βββ Giliran berikutnya β β β (simpan β β β β DB) β β β ββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββ
Blocking vs Non-Blocking: Kesalahan Umum
"""
Event Loop β Blocking vs Non-Blocking
BeebaneLabs - https://beebanelabs.pages.dev
"""
import asyncio
import time
# β SALAH β menggunakan time.sleep() (blocking!)
async def salah():
print("[SALAH] Mulai...")
time.sleep(3) # MEMBLOKIR event loop! Semua task terhambat
print("[SALAH] Selesai")
# β
BENAR β menggunakan asyncio.sleep() (non-blocking)
async def benar():
print("[BENAR] Mulai...")
await asyncio.sleep(3) # TIDAK memblokir β event loop bisa jalan
print("[BENAR] Selesai")
# Demonstrasi dampak blocking
async def heartbeat():
"""Task yang harus berjalan setiap 1 detik."""
for i in range(5):
print(f" [Heartbeat] detik {i+1}")
await asyncio.sleep(1)
async def demo_salah():
print("=== DEMO: DENGAN time.sleep() (SALAH) ===")
start = time.time()
await asyncio.gather(
salah(), # Ini memblokir!
heartbeat(), # Heartbeat terhambat
)
print(f"Total: {time.time() - start:.1f}s\n")
async def demo_benar():
print("=== DEMO: DENGAN asyncio.sleep() (BENAR) ===")
start = time.time()
await asyncio.gather(
benar(), # Non-blocking
heartbeat(), # Heartbeat lancar
)
print(f"Total: {time.time() - start:.1f}s")
async def main():
await demo_salah()
await demo_benar()
asyncio.run(main())
Solusi untuk Operasi Blocking yang Tidak Bisa Dihindari
Kadang kita harus memanggil library yang blocking (misalnya library C). Gunakan asyncio.to_thread() untuk menjalankannya di thread terpisah:
import asyncio
def baca_sensor_blocking():
"""Operasi blocking dari library C/serial."""
import time
time.sleep(2) # Simulasi blocking I/O
return 28.5
async def main():
# Jalankan di thread terpisah agar event loop tidak terblokir
hasil = await asyncio.to_thread(baca_sensor_blocking)
print(f"Hasil: {hasil}Β°C")
asyncio.run(main())
5. Async HTTP Requests
Dalam IoT, mengirim data ke server REST API adalah operasi yang sangat umum. Library aiohttp menyediakan HTTP client dan server asinkron yang sangat efisien.
"""
Async HTTP Requests dengan aiohttp
BeebaneLabs - https://beebanelabs.pages.dev
Instalasi: pip install aiohttp
"""
import asyncio
import aiohttp
import json
import time
async def kirim_data_sensor(session, url, data):
"""Mengirim data sensor ke server via HTTP POST."""
try:
async with session.post(url, json=data) as response:
status = response.status
result = await response.text()
print(f" [HTTP POST] Status: {status}, Response: {result[:100]}")
return {"status": status, "data": result}
except aiohttp.ClientError as e:
print(f" [HTTP POST] Error: {e}")
return None
async def ambil_konfigurasi(session, url):
"""Mengambil konfigurasi dari server via HTTP GET."""
try:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
print(f" [HTTP GET] Config diterima")
return data
except aiohttp.ClientError as e:
print(f" [HTTP GET] Error: {e}")
return None
async def kirim_batch_data(base_url, data_list):
"""Mengirim banyak data sensor sekaligus secara concurrent."""
async with aiohttp.ClientSession() as session:
tasks = []
for data in data_list:
task = kirim_data_sensor(session, f"{base_url}/api/sensor", data)
tasks.append(task)
# Kirim semua sekaligus
hasil = await asyncio.gather(*tasks, return_exceptions=True)
return hasil
async def main():
base_url = "https://httpbin.org" # Testing endpoint
# Contoh data sensor
sensors_data = [
{"sensor": "suhu", "nilai": 28.5, "unit": "Β°C", "lokasi": "ruang-tamu"},
{"sensor": "kelembaban", "nilai": 65.2, "unit": "%", "lokasi": "ruang-tamu"},
{"sensor": "gas", "nilai": 450, "unit": "ppm", "lokasi": "dapur"},
]
print("=== Mengirim 3 data sensor secara bersamaan ===")
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [
kirim_data_sensor(session, f"{base_url}/post", data)
for data in sensors_data
]
hasil = await asyncio.gather(*tasks)
elapsed = time.time() - start
print(f"\nSemua terkirim dalam {elapsed:.2f}s")
print(f"Berhasil: {sum(1 for h in hasil if h)} dari {len(hasil)}")
# Connection timeout & retry
print("\n=== Dengan Timeout & Retry ===")
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
for attempt in range(3):
try:
result = await kirim_data_sensor(
session, f"{base_url}/post",
{"sensor": "test", "attempt": attempt + 1}
)
if result:
break
except asyncio.TimeoutError:
print(f" Timeout percobaan {attempt + 1}, retry...")
await asyncio.sleep(2 ** attempt) # Exponential backoff
asyncio.run(main())
Gunakan aiohttp.ClientSession() sebagai context manager atau buat satu session untuk seluruh lifecycle aplikasi. Membuat session baru untuk setiap request sangat tidak efisien karena harus membuka koneksi TCP baru setiap kali.
6. Async MQTT
MQTT adalah protokol utama di IoT, dan library aiomqtt (dibangun atas paho-mqtt) memungkinkan kita menggunakan MQTT secara asinkron β sangat penting untuk gateway IoT yang perlu menangani banyak topik dan perangkat secara bersamaan.
"""
Async MQTT dengan aiomqtt
BeebaneLabs - https://beebanelabs.pages.dev
Instalasi: pip install aiomqtt
"""
import asyncio
import json
import time
from aiomqtt import Client as MQTTClient
BROKER = "localhost"
PORT = 1883
async def subscriber(nama, topics):
"""Async MQTT subscriber β mendengarkan beberapa topik."""
print(f"[{nama}] Menghubungkan ke broker...")
async with MQTTClient(BROKER, PORT) as client:
# Subscribe ke semua topik
for topic in topics:
await client.subscribe(topic)
print(f"[{nama}] Subscribe: {topic}")
# Loop terima pesan
async for message in client.messages:
payload = message.payload.decode()
print(f"[{nama}] {message.topic} β {payload}")
# Proses berdasarkan topik
topic_str = str(message.topic)
if "suhu" in topic_str:
data = json.loads(payload)
if data.get("suhu", 0) > 35:
print(f"[{nama}] β οΈ ALERT: Suhu tinggi! {data['suhu']}Β°C")
async def publisher(nama, topic, interval=3):
"""Async MQTT publisher β mengirim data periodik."""
async with MQTTClient(BROKER, PORT) as client:
print(f"[{nama}] Publisher aktif, mengirim setiap {interval}s")
count = 0
while True:
count += 1
payload = json.dumps({
"suhu": 25 + (count % 10),
"kelembaban": 50 + (count % 30),
"timestamp": int(time.time()),
"sequence": count
})
await client.publish(topic, payload, qos=1)
print(f"[{nama}] Publish #{count}")
await asyncio.sleep(interval)
async def main():
"""Menjalankan subscriber dan publisher secara bersamaan."""
print("=== Async MQTT Demo ===")
# Jalankan subscriber dan publisher concurrently
await asyncio.gather(
subscriber("Sub-1", ["home/sensor/#", "home/status/#"]),
publisher("Pub-1", "home/sensor/suhu", interval=2),
)
# Jalankan dengan: python async_mqtt.py
# Pastikan MQTT broker (Mosquitto) sudah berjalan
# asyncio.run(main())
print("Uncomment asyncio.run(main()) untuk menjalankan")
Untuk menjalankan contoh async MQTT, Anda perlu menginstal aiomqtt (pip install aiomqtt) dan memiliki MQTT broker yang berjalan (misalnya Mosquitto). aiomqtt dibangun di atas paho-mqtt dan mendukung MQTT v3.1.1 dan v5.0.
7. Async Database Queries
Menyimpan data sensor ke database adalah kebutuhan umum di IoT. Library aiosqlite dan asyncpg memungkinkan operasi database tanpa memblokir event loop.
"""
Async Database Operations
BeebaneLabs - https://beebanelabs.pages.dev
Instalasi: pip install aiosqlite
"""
import asyncio
import aiosqlite
import time
import json
async def setup_database(db_path):
"""Membuat tabel jika belum ada."""
async with aiosqlite.connect(db_path) as db:
await db.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
sensor TEXT NOT NULL,
nilai REAL NOT NULL,
satuan TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
await db.execute("""
CREATE INDEX IF NOT EXISTS idx_sensor
ON sensor_data(sensor)
""")
await db.commit()
print("[DB] Tabel siap")
async def simpan_data(db_path, sensor, nilai, satuan=""):
"""Menyimpan satu data sensor."""
async with aiosqlite.connect(db_path) as db:
await db.execute(
"INSERT INTO sensor_data (sensor, nilai, satuan) VALUES (?, ?, ?)",
(sensor, nilai, satuan)
)
await db.commit()
print(f"[DB] Disimpan: {sensor} = {nilai}{satuan}")
async def simpan_batch(db_path, data_list):
"""Menyimpan banyak data sekaligus (lebih efisien)."""
async with aiosqlite.connect(db_path) as db:
await db.executemany(
"INSERT INTO sensor_data (sensor, nilai, satuan) VALUES (?, ?, ?)",
[(d["sensor"], d["nilai"], d["satuan"]) for d in data_list]
)
await db.commit()
print(f"[DB] Batch disimpan: {len(data_list)} data")
async def ambil_data_terakhir(db_path, sensor, limit=10):
"""Mengambil data terakhir dari sensor tertentu."""
async with aiosqlite.connect(db_path) as db:
db.row_factory = aiosqlite.Row
async with db.execute(
"SELECT * FROM sensor_data WHERE sensor = ? ORDER BY id DESC LIMIT ?",
(sensor, limit)
) as cursor:
rows = await cursor.fetchall()
return [dict(row) for row in rows]
async def ambil_statistik(db_path, sensor):
"""Menghitung statistik dari data sensor."""
async with aiosqlite.connect(db_path) as db:
async with db.execute("""
SELECT
COUNT(*) as jumlah,
MIN(nilai) as min_val,
MAX(nilai) as max_val,
AVG(nilai) as avg_val
FROM sensor_data
WHERE sensor = ?
""", (sensor,)) as cursor:
row = await cursor.fetchone()
return {
"jumlah": row[0],
"min": row[1],
"max": row[2],
"avg": round(row[3], 2) if row[3] else None
}
async def main():
db_path = "iot_data.db"
# Setup
await setup_database(db_path)
# Simpan data satu per satu
await simpan_data(db_path, "suhu", 28.5, "Β°C")
await simpan_data(db_path, "suhu", 29.1, "Β°C")
# Simpan batch (lebih cepat untuk banyak data)
batch_data = [
{"sensor": "suhu", "nilai": 27.8, "satuan": "Β°C"},
{"sensor": "suhu", "nilai": 28.2, "satuan": "Β°C"},
{"sensor": "kelembaban", "nilai": 65.0, "satuan": "%"},
{"sensor": "kelembaban", "nilai": 68.5, "satuan": "%"},
{"sensor": "gas", "nilai": 420, "satuan": "ppm"},
]
await simpan_batch(db_path, batch_data)
# Ambil data terakhir
print("\n=== Data Terakhir: suhu ===")
data = await ambil_data_terakhir(db_path, "suhu", limit=5)
for row in data:
print(f" {row['timestamp']}: {row['nilai']}{row['satuan']}")
# Statistik
print("\n=== Statistik: suhu ===")
stats = await ambil_statistik(db_path, "suhu")
print(f" Jumlah: {stats['jumlah']}, Min: {stats['min']}, "
f"Max: {stats['max']}, Avg: {stats['avg']}")
asyncio.run(main())
8. Contoh Nyata: Sistem IoT Async
Mari kita gabungkan semua konsep ke dalam satu sistem IoT yang utuh β membaca sensor, mengirim ke server, menyimpan ke database, dan memantau MQTT, semuanya berjalan bersamaan.
"""
Sistem IoT Async Lengkap
BeebaneLabs - https://beebanelabs.pages.dev
"""
import asyncio
import json
import time
import random
from datetime import datetime
# === SENSOR READER ===
class AsyncSensorHub:
"""Membaca beberapa sensor secara concurrent."""
def __init__(self):
self.sensors = {}
def tambah_sensor(self, key, read_fn, interval=5):
self.sensors[key] = {"read": read_fn, "interval": interval}
async def baca_periodik(self, key, callback):
"""Loop baca sensor secara periodik."""
sensor = self.sensors[key]
while True:
try:
data = await sensor["read"]()
await callback(key, data)
except Exception as e:
print(f"[{key}] Error: {e}")
await asyncio.sleep(sensor["interval"])
# === SIMULASI SENSOR ===
async def baca_suhu():
await asyncio.sleep(0.1) # Simulasi delay hardware
return {"suhu": round(random.uniform(22, 38), 1), "satuan": "Β°C"}
async def baca_kelembaban():
await asyncio.sleep(0.15)
return {"kelembaban": round(random.uniform(30, 90), 1), "satuan": "%"}
async def baca_gas():
await asyncio.sleep(0.2)
return {"gas_ppm": round(random.uniform(200, 1500), 0), "satuan": "ppm"}
# === DATA PROCESSOR ===
class DataProcessor:
"""Memproses dan mendistribusikan data sensor."""
def __init__(self):
self.buffer = []
self.alerts = []
async def proses(self, key, data):
"""Memproses data dari sensor."""
timestamp = datetime.now().strftime("%H:%M:%S")
entry = {"sensor": key, "data": data, "waktu": timestamp}
self.buffer.append(entry)
print(f" [{timestamp}] {key}: {data}")
# Cek threshold
await self._cek_threshold(key, data)
async def _cek_threshold(self, key, data):
"""Cek apakah data melewati threshold."""
if key == "suhu" and data.get("suhu", 0) > 35:
alert = f"π¨ {key}: {data['suhu']}Β°C > 35Β°C!"
self.alerts.append(alert)
print(f" {alert}")
elif key == "gas" and data.get("gas_ppm", 0) > 1000:
alert = f"π¨ {key}: {data['gas_ppm']}ppm > 1000ppm!"
self.alerts.append(alert)
print(f" {alert}")
# === DATA SENDER ===
class AsyncDataSender:
"""Mengirim data ke berbagai tujuan secara async."""
def __init__(self):
self.sent_count = 0
async def kirim_ke_cloud(self, data):
"""Simulasi kirim ke cloud server."""
await asyncio.sleep(0.3) # Simulasi network latency
self.sent_count += 1
async def simpan_lokal(self, data):
"""Simulasi simpan ke local database."""
await asyncio.sleep(0.05)
# === MAIN SYSTEM ===
async def main():
print("=" * 50)
print(" SISTEM IoT ASYNC β BeebaneLabs")
print("=" * 50)
# Setup komponen
hub = AsyncSensorHub()
processor = DataProcessor()
sender = AsyncDataSender()
# Tambah sensor
hub.tambah_sensor("suhu", baca_suhu, interval=2)
hub.tambah_sensor("kelembaban", baca_kelembaban, interval=3)
hub.tambah_sensor("gas", baca_gas, interval=2.5)
# Callback: proses + kirim
async def on_sensor_data(key, data):
await processor.proses(key, data)
await sender.kirim_ke_cloud({"sensor": key, **data})
# Jalankan semua sensor secara bersamaan
tasks = [
asyncio.create_task(hub.baca_periodik(key, on_sensor_data))
for key in hub.sensors
]
# Tambah task monitoring
async def status_reporter():
while True:
await asyncio.sleep(10)
print(f"\nπ Status: {sender.sent_count} data terkirim, "
f"{len(processor.alerts)} alert\n")
tasks.append(asyncio.create_task(status_reporter()))
# Jalankan selama 15 detik
print("\nπ Sistem berjalan...\n")
await asyncio.sleep(15)
# Cleanup
for task in tasks:
task.cancel()
print(f"\n{'=' * 50}")
print(f" Selesai! Total: {sender.sent_count} data terkirim")
print(f" Alerts: {len(processor.alerts)}")
print(f"{'=' * 50}")
asyncio.run(main())
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β EVENT LOOP (asyncio) β β β β βββββββββββββββ βββββββββββββββ ββββββββββββββββ β β β Sensor Suhu β β Sensor Lemb β β Sensor Gas β β β β (setiap 2s)β β (setiap 3s)β β (setiap 2.5sβ β β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬ββββββββ β β β β β β β ββββββββββ¬ββββββββ΄βββββββββ¬ββββββββ β β βΌ βΌ β β ββββββββββββββββ ββββββββββββββββ β β β DataProcessorβ β StatusReport β β β β (threshold) β β (setiap 10s) β β β ββββββββ¬ββββββββ ββββββββββββββββ β β β β β ββββββββ΄ββββββββ β β βΌ βΌ β β βββββββββββββββ ββββββββββββ β β β Kirim Cloud β β Simpan DBβ β β β (async) β β (async) β β β βββββββββββββββ ββββββββββββ β βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
9. Quiz: Uji Pemahamanmu!
Setelah membaca tutorial di atas, jawablah 5 pertanyaan berikut untuk menguji pemahamanmu tentang async Python untuk IoT: