Cybersecurity

Threat Intelligence: Panduan Lengkap Ancaman Siber

Tutorial komprehensif tentang Cyber Threat Intelligence β€” dari sumber data, framework (MITRE ATT&CK, STIX/TAXII), indikator kompromi (IoC), threat hunting, hingga automation dan integrasi dengan SIEM

1. Pengenalan Threat Intelligence

Cyber Threat Intelligence (CTI) adalah informasi yang dikumpulkan, dianalisis, dan diinterpretasikan tentang ancaman siber yang sudah ada, sedang berlangsung, atau yang akan datang. CTI membantu organisasi memahami siapa yang menyerang, mengapa, bagaimana, dan apa yang bisa dilakukan untuk melindungi diri.

Threat Intelligence bukan sekadar daftar IP dan domain berbahaya. CTI yang baik memberikan konteks yang memungkinkan pengambilan keputusan keamanan yang lebih tepat β€” dari strategi investasi keamanan hingga prioritas patch dan konfigurasi.

Tipe Threat Intelligence

Tipe Audiens Contoh Fungsi
StrategicCISO, DireksiLaporan tren ancaman, risiko industriPengambilan keputusan bisnis & investasi keamanan
TacticalSecurity ArchitectTTP adversary, MITRE ATT&CK mappingDesain pertahanan dan arsitektur keamanan
OperationalIncident ResponseKampanye serangan aktif, target industriRespons insiden dan threat hunting
TechnicalSOC AnalystIoC (IP, domain, hash), signaturesDetection, blocking, dan monitoring real-time
Diagram: Threat Intelligence Lifecycle
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚              THREAT INTELLIGENCE LIFECYCLE                      β”‚
β”‚                                                                β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚
β”‚  β”‚1. PLAN   │──▢│2. COLLECT│──▢│3. PROCESSβ”‚                  β”‚
β”‚  β”‚          β”‚   β”‚          β”‚   β”‚          β”‚                  β”‚
β”‚  β”‚β€’ Requirementsβ”‚β€’ OSINT   β”‚   β”‚β€’ Normalizeβ”‚                  β”‚
β”‚  β”‚β€’ Priority β”‚   β”‚β€’ Feeds   β”‚   β”‚β€’ Filter  β”‚                  β”‚
β”‚  β”‚β€’ Scope    β”‚   β”‚β€’ Dark webβ”‚   β”‚β€’ Dedupe  β”‚                  β”‚
β”‚  β”‚β€’ Stake-   β”‚   β”‚β€’ Honeypotβ”‚   β”‚β€’ Enrich  β”‚                  β”‚
β”‚  β”‚  holders  β”‚   β”‚β€’ ISAC    β”‚   β”‚          β”‚                  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚
β”‚       β–²                                    β”‚                   β”‚
β”‚       β”‚                                    β–Ό                   β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                  β”‚
β”‚  β”‚6. FEEDBACK│◀──│5. DISTRIB│◀──│4. ANALYZEβ”‚                  β”‚
β”‚  β”‚          β”‚   β”‚          β”‚   β”‚          β”‚                  β”‚
β”‚  β”‚β€’ Evaluateβ”‚   β”‚β€’ Alerts  β”‚   β”‚β€’ Context β”‚                  β”‚
β”‚  β”‚β€’ Improve β”‚   β”‚β€’ Reports β”‚   β”‚β€’ Correlate                  β”‚
β”‚  β”‚β€’ Update  β”‚   β”‚β€’ SIEM    β”‚   β”‚β€’ Attributeβ”‚                  β”‚
β”‚  β”‚  require-β”‚   β”‚β€’ TIP     β”‚   β”‚β€’ Predict β”‚                  β”‚
β”‚  β”‚  ments   β”‚   β”‚β€’ Block   β”‚   β”‚          β”‚                  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

2. Framework & Model

2.1 MITRE ATT&CK

MITRE ATT&CK adalah framework global yang mendokumentasikan TTP (Tactics, Techniques, and Procedures) adversary berdasarkan observasi dunia nyata. Framework ini menjadi standar industri untuk berkomunikasi tentang ancaman.

Framework β€” MITRE ATT&CK Structure
# ===== MITRE ATT&CK STRUCTURE =====

# ATT&CK menyusun pengetahuan tentang serangan dalam:
# - 14 Tactics (tujuan/tahap serangan)
# - 200+ Techniques (cara mencapai tujuan)
# - 400+ Sub-techniques (variasi teknik)
# - 800+ Procedures (implementasi spesifik)

# ===== ENTERPRISE TACTICS =====
# TA0043  Reconnaissance         β€” Mengumpulkan info target
# TA0042  Resource Development   β€” Menyiapkan infrastruktur
# TA0001  Initial Access         β€” Mendapatkan akses awal
# TA0002  Execution              β€” Menjalankan kode
# TA0003  Persistence            β€” Mempertahankan akses
# TA0004  Privilege Escalation   β€” Meningkatkan hak akses
# TA0005  Defense Evasion        β€” Menghindari deteksi
# TA0006  Credential Access      β€” Mencuri kredensial
# TA0007  Discovery              β€” Menjelajahi lingkungan
# TA0008  Lateral Movement       β€” Bergerak ke sistem lain
# TA0009  Collection             β€” Mengumpulkan data target
# TA0011  Command and Control    β€” Komunikasi dengan C2
# TA0010  Exfiltration           β€” Mencuri data keluar
# TA0040  Impact                 β€” Merusak atau memanipulasi

# ===== CONTOH MAPPING =====
# APT29 (Cozy Bear) β€” TTP yang terdokumentasi:
# TA0001: T1566.002 (Spearphishing Link)
# TA0002: T1059.001 (PowerShell)
# TA0003: T1547.001 (Registry Run Keys)
# TA0005: T1027 (Obfuscated Files)
# TA0006: T1003.001 (LSASS Memory)
# TA0008: T1021.002 (SMB/Windows Admin Shares)
# TA0011: T1071.001 (Web Protocols)

# ===== MENGGUNAKAN ATT&CK MATRIX =====
# 1. Identifikasi threat actors yang relevan untuk industri Anda
# 2. Mapping TTP mereka ke ATT&CK matrix
# 3. Identifikasi gap dalam pertahanan Anda
# 4. Prioritaskan mitigasi berdasarkan risk

# Online: https://attack.mitre.org/matrices/enterprise/
# Navigator: https://mitre-attack.github.io/attack-navigator/

2.2 Diamond Model & Kill Chain

Framework β€” Diamond Model & Cyber Kill Chain
# ===== CYBER KILL CHAIN (Lockheed Martin) =====

# 7 tahap serangan:
# 1. Reconnaissance    β†’ Target identification
# 2. Weaponization     β†’ Create malicious payload
# 3. Delivery          β†’ Send to victim (email, web, USB)
# 4. Exploitation      β†’ Trigger vulnerability
# 5. Installation      β†’ Install malware/backdoor
# 6. Command & Control β†’ Establish remote access
# 7. Actions on Obj.   β†’ Achieve attacker goal

# Setiap tahap adalah kesempatan untuk DETEKSI dan PENCEGAHAN

# ===== DIAMOND MODEL =====
# Model analisis yang menghubungkan 4 elemen:
#
#     Adversary ←────────→ Capability
#         β”‚                    β”‚
#         β”‚                    β”‚
#     Infrastructure ←────→ Victim
#
# Setiap event serangan memiliki:
# - Adversary: Siapa yang menyerang
# - Capability: Tools/malware yang digunakan
# - Infrastructure: Server/domain C2
# - Victim: Target yang diserang
#
# Dengan menganalisis hubungan ini, kita bisa:
# - Mengatribusi serangan ke kelompok tertentu
# - Memprediksi langkah selanjutnya
# - Menghubungkan beberapa insiden

# ===== CONTOH ANALISIS =====
# Event: Phishing email terdeteksi
#
# Adversary: APT group "XYZ"
#   β†’ TTP: Spearphishing dengan dokumen Office macro
#   β†’ Motivation: Espionage (intellectual property theft)
#
# Capability: Custom RAT "ShadowTrojan"
#   β†’ SHA256: abc123...
#   β†’ C2 protocol: HTTPS dengan custom encryption
#
# Infrastructure:
#   β†’ C2 domain: updates-service[.]com
#   β†’ C2 IP: 185.x.x.x (hosted di bulletproof hosting)
#   β†’ Redirector: Cloudflare Workers
#
# Victim:
#   β†’ Target: R&D department
#   β†’ Entry point: email user@target.com
#   β†’ Compromised host: WORKSTATION-042

3. Sumber Data Threat Intelligence

Reference β€” Threat Intelligence Sources
# ===== SUMBER DATA THREAT INTELLIGENCE =====

# ===== 1. OPEN SOURCE INTELLIGENCE (OSINT) =====

# VirusTotal β€” Analisis file dan URL
# https://www.virustotal.com
# API: cek hash, domain, IP, URL
curl "https://www.virustotal.com/api/v3/ip_addresses/1.2.3.4" \
    -H "x-apikey: YOUR_API_KEY"

# AlienVault OTX β€” Open Threat Exchange
# https://otx.alienvault.com
# Community-driven threat intelligence
# API endpoint: https://otx.alienvault.com/api/v1/indicators/

# AbuseIPDB β€” Database IP berbahaya
# https://www.abuseipdb.com
curl "https://api.abuseipdb.com/api/v2/check?ipAddress=1.2.3.4" \
    -H "Key: YOUR_API_KEY" \
    -H "Accept: application/json"

# URLhaus β€” Database URL malware
# https://urlhaus.abuse.ch/api/

# MalwareBazaar β€” Database sample malware
# https://bazaar.abuse.ch/api/

# PhishTank β€” Database phishing URLs
# https://phishtank.org/developer_info.php

# Shodan β€” Internet-connected device search
# https://www.shodan.io
# Cari exposed services, vulnerable devices

# Censys β€” Internet-wide scanning data
# https://search.censys.io

# ===== 2. COMMUNITY & GOVERNMENT SOURCES =====

# ID-SIRTII/CC (Indonesia)
# https://idsirtii.or.id
# National CSIRT Indonesia β€” alert dan advisory

# CISA (US Cybersecurity & Infrastructure)
# https://www.cisa.gov/cybersecurity-advisories
# Known Exploited Vulnerabilities (KEV) catalog

# MITRE ATT&CK
# https://attack.mitre.org
# TTP database dan mapping

# FIRST β€” Forum of Incident Response Teams
# https://www.first.org

# ===== 3. COMMERCIAL THREAT INTELLIGENCE =====

# Recorded Future β€” AI-powered threat intelligence
# Mandiant Advantage β€” Threat intelligence platform
# CrowdStrike Falcon β€” Threat intelligence feed
# Palo Alto Unit 42 β€” Research & threat intel
# Cisco Talos β€” Threat intelligence team
# Microsoft Threat Intelligence β€” MSTIC

# ===== 4. ISAC (Information Sharing and Analysis Center) =====
# FS-ISAC  β†’ Financial Services
# H-ISAC   β†’ Healthcare
# IT-ISAC   β†’ Information Technology
# E-ISAC    β†’ Energy
# MS-ISAC   β†’ State/Local Government
# Aviation ISAC, Retail ISAC, dll.

# ===== 5. DARK WEB MONITORING =====
# Monitor dark web forum, marketplace, dan paste sites
# untuk data breach, credential leaks, zero-day sales
# Tools: DarkOwl, Flashpoint, Recorded Future

4. Indikator Kompromi (IoC)

Indicators of Compromise (IoC) adalah artefak teknis yang menunjukkan adanya intrusi atau aktivitas berbahaya. IoC adalah output paling langsung dari threat intelligence yang digunakan oleh SOC untuk detection dan blocking.

4.1 Jenis-Jenis IoC

Jenis IoC Contoh Confidence Shelf Life
IP Address185.220.100.252MediumPendek (berubah cepat)
Domainc2-malware[.]comMediumPendek–Sedang
URLhxxps://c2[.]com/payload.dllMedium–HighPendek
File HashSHA256: a1b2c3...HighPermanen (tapi bisa diubah)
Email Addressattacker@phish.comMediumSedang
YARA RulePola byte malwareHighLama
JA3/JA3S HashTLS fingerprintHighLama
Registry KeyHKCU\Software\MalwareHighLama
MutexGlobal\MalwareMutexHighLama
TTPTechnique behaviorHighestPaling lama
Python β€” IoC Collection & Enrichment
# ===== IOC COLLECTION & ENRICHMENT =====

import requests
import hashlib
import json
from datetime import datetime

class ThreatIntelCollector:
    """Collector untuk mengumpulkan dan enrich IoC"""
    
    def __init__(self, vt_api_key=None, otx_key=None, abuse_key=None):
        self.vt_key = vt_api_key
        self.otx_key = otx_key
        self.abuse_key = abuse_key
    
    # ===== VIRUSTOTAL =====
    def check_virustotal(self, indicator, indicator_type='ip'):
        """Cek IoC di VirusTotal"""
        if not self.vt_key:
            return None
        
        headers = {'x-apikey': self.vt_key}
        base = 'https://www.virustotal.com/api/v3'
        
        endpoints = {
            'ip': f'{base}/ip_addresses/{indicator}',
            'domain': f'{base}/domains/{indicator}',
            'hash': f'{base}/files/{indicator}',
            'url': f'{base}/urls/{indicator}'
        }
        
        try:
            resp = requests.get(
                endpoints.get(indicator_type, endpoints['ip']),
                headers=headers
            )
            data = resp.json()
            
            if 'data' in data:
                attrs = data['data']['attributes']
                return {
                    'source': 'VirusTotal',
                    'indicator': indicator,
                    'type': indicator_type,
                    'malicious': attrs.get('last_analysis_stats', {}).get('malicious', 0),
                    'suspicious': attrs.get('last_analysis_stats', {}).get('suspicious', 0),
                    'harmless': attrs.get('last_analysis_stats', {}).get('harmless', 0),
                    'reputation': attrs.get('reputation', 0),
                    'tags': attrs.get('tags', []),
                    'checked_at': datetime.utcnow().isoformat()
                }
        except Exception as e:
            return {'error': str(e)}
    
    # ===== ALIENVAULT OTX =====
    def check_otx(self, indicator, indicator_type='IPv4'):
        """Cek IoC di AlienVault OTX"""
        if not self.otx_key:
            return None
        
        headers = {'X-OTX-API-KEY': self.otx_key}
        base = 'https://otx.alienvault.com/api/v1/indicators'
        
        endpoints = {
            'IPv4': f'{base}/IPv4/{indicator}/general',
            'domain': f'{base}/domain/{indicator}/general',
            'file': f'{base}/file/{indicator}/general',
            'URL': f'{base}/url/{indicator}/general'
        }
        
        try:
            resp = requests.get(
                endpoints.get(indicator_type, endpoints['IPv4']),
                headers=headers
            )
            data = resp.json()
            
            return {
                'source': 'OTX',
                'indicator': indicator,
                'pulse_count': data.get('pulse_info', {}).get('count', 0),
                'reputation': data.get('reputation', 0),
                'country': data.get('country_name', 'Unknown'),
                'checked_at': datetime.utcnow().isoformat()
            }
        except Exception as e:
            return {'error': str(e)}
    
    # ===== ABUSEIPDB =====
    def check_abuseipdb(self, ip):
        """Cek IP di AbuseIPDB"""
        if not self.abuse_key:
            return None
        
        headers = {
            'Key': self.abuse_key,
            'Accept': 'application/json'
        }
        
        try:
            resp = requests.get(
                'https://api.abuseipdb.com/api/v2/check',
                headers=headers,
                params={'ipAddress': ip, 'maxAgeInDays': 90}
            )
            data = resp.json().get('data', {})
            
            return {
                'source': 'AbuseIPDB',
                'indicator': ip,
                'abuse_confidence_score': data.get('abuseConfidenceScore', 0),
                'total_reports': data.get('totalReports', 0),
                'isp': data.get('isp', 'Unknown'),
                'country': data.get('countryCode', 'Unknown'),
                'is_tor': data.get('isTor', False),
                'checked_at': datetime.utcnow().isoformat()
            }
        except Exception as e:
            return {'error': str(e)}
    
    def enrich_ioc(self, indicator, indicator_type='ip'):
        """Enrich IoC dari multiple sources"""
        results = {
            'indicator': indicator,
            'type': indicator_type,
            'enrichments': [],
            'risk_score': 0,
            'analyzed_at': datetime.utcnow().isoformat()
        }
        
        # Query semua source
        if indicator_type == 'ip':
            vt = self.check_virustotal(indicator, 'ip')
            otx = self.check_otx(indicator, 'IPv4')
            abuse = self.check_abuseipdb(indicator)
            
            if vt: results['enrichments'].append(vt)
            if otx: results['enrichments'].append(otx)
            if abuse: results['enrichments'].append(abuse)
        
        # Hitung aggregate risk score
        scores = []
        for e in results['enrichments']:
            if 'malicious' in e:
                scores.append(min(e['malicious'] * 2, 100))
            if 'abuse_confidence_score' in e:
                scores.append(e['abuse_confidence_score'])
            if 'pulse_count' in e and e['pulse_count'] > 0:
                scores.append(min(e['pulse_count'] * 10, 100))
        
        results['risk_score'] = int(sum(scores) / max(len(scores), 1))
        
        return results

# ===== USAGE =====
# collector = ThreatIntelCollector(
#     vt_api_key='YOUR_VT_KEY',
#     otx_key='YOUR_OTX_KEY',
#     abuse_key='YOUR_ABUSE_KEY'
# )
#
# # Enrich IP address
# result = collector.enrich_ioc('185.220.100.252', 'ip')
# print(json.dumps(result, indent=2))
#
# # Enrich file hash
# result = collector.enrich_ioc('abc123...', 'hash')
# print(json.dumps(result, indent=2))

5. STIX/TAXII & Data Sharing

Concept β€” STIX/TAXII & Threat Data Standards
# ===== STIX (Structured Threat Information eXpression) =====

# STIX adalah standar untuk mendeskripsikan threat intelligence
# dalam format JSON yang terstruktur dan dapat dipertukarkan

# STIX 2.1 Domain Objects:
# - Indicator: Pola untuk mendeteksi malicious activity
# - Malware: Deskripsi malware
# - Attack Pattern: Deskripsi teknik serangan
# - Threat Actor: Kelompok/individu penyerang
# - Campaign: Kampanye serangan yang terorganisir
# - Infrastructure: Infrastruktur yang digunakan attacker
# - Vulnerability: Kerentanan yang dieksploitasi
# - Identity: Organisasi atau individu
# - Report: Laporan threat intelligence
# - Sighting: Observasi indicator di lingkungan

# ===== CONTOH STIX INDICATOR =====
# {
#   "type": "indicator",
#   "spec_version": "2.1",
#   "id": "indicator--a1b2c3d4-e5f6-7890-abcd-ef1234567890",
#   "created": "2026-06-26T00:00:00.000Z",
#   "modified": "2026-06-26T00:00:00.000Z",
#   "name": "APT29 C2 Domain",
#   "description": "Command and control domain used by APT29",
#   "indicator_types": ["malicious-activity"],
#   "pattern": "[domain-name:value = 'c2-apt29.example.com']",
#   "pattern_type": "stix",
#   "valid_from": "2026-06-26T00:00:00.000Z",
#   "kill_chain_phases": [
#     {
#       "kill_chain_name": "mitre-attack",
#       "phase_name": "command-and-control"
#     }
#   ]
# }

# ===== CONTOH STIX MALWARE =====
# {
#   "type": "malware",
#   "spec_version": "2.1",
#   "id": "malware--b2c3d4e5-f6a7-8901-bcde-f12345678901",
#   "created": "2026-06-26T00:00:00.000Z",
#   "name": "ShadowTrojan",
#   "malware_types": ["remote-access-trojan"],
#   "is_family": true,
#   "description": "Custom RAT used by APT29 for espionage",
#   "kill_chain_phases": [
#     {
#       "kill_chain_name": "mitre-attack",
#       "phase_name": "execution"
#     }
#   ]
# }

# ===== TAXII (Trusted Automated eXchange of Intelligence) =====

# TAXII adalah protokol untuk bertukar STIX data secara otomatis
# TAXII 2.1 mendefinisikan 3 services:
# 1. Collections β€” Kumpulan STIX objects
# 2. Channels β€” Pub/Sub messaging
# 3. Inbox β€” Menerima STIX objects

# ===== TAXII SERVER (OpenCTI, MISP, dll) =====

# Menerima data dari TAXII server:
# GET /taxii2/collections/ β€” List available collections
# GET /taxii2/collections/{id}/objects/ β€” Get STIX objects
# POST /taxii2/collections/{id}/objects/ β€” Add STIX objects

# Python TAXII client:
# pip install taxii2-client stix2

from taxii2client.v21 import Collection, Server

# Connect to TAXII server
server = Server("https://taxii.example.com/taxii2/")

# List collections
api_root = server.api_roots[0]
for collection in api_root.collections:
    print(f"Collection: {collection.title}")
    print(f"  Description: {collection.description}")
    print(f"  Can Read: {collection.can_read}")
    print(f"  Can Write: {collection.can_write}")

# Get objects from collection
collection = Collection(
    "https://taxii.example.com/taxii2/collections/malware-indicators/",
    user="api_user",
    password="api_pass"
)

# Get STIX objects
objects = collection.get_objects()
for obj in objects.get('objects', []):
    print(f"Type: {obj['type']}, Name: {obj.get('name', 'N/A')}")

6. Threat Hunting

Python β€” Threat Hunting Queries
# ===== THREAT HUNTING =====

# Threat Hunting adalah proaktif mencari ancaman yang
# TIDAK terdeteksi oleh tools otomatis

# HYPOTHESIS-DRIVEN HUNTING:
# 1. Buat hipotesis berdasarkan threat intel
# 2. Buat query/detection untuk membuktikan/menolak
# 3. Analisis hasil
# 4. Refine dan iterate

# ===== CONTOH HUNTING QUERIES =====

# --- KQL (Kusto Query Language) untuk Microsoft Sentinel ---

# 1. Cari anomali login (unusual source country)
# SigninLogs
# | where TimeGenerated > ago(7d)
# | summarize count() by Location, UserPrincipalName
# | where count_ < 5  # Login dari lokasi langka
# | order by count_ asc

# 2. Cari DNS tunneling
# DnsEvents
# | where TimeGenerated > ago(24h)
# | where QueryType == "A"
# | extend DomainLen = strlen(Name)
# | where DomainLen > 50  # Subdomain sangat panjang
# | summarize count() by ClientIP, Name
# | where count_ > 100
# | order by DomainLen desc

# 3. Cari encoded PowerShell commands
# Event
# | where EventID == 4104
# | where ScriptBlockText contains "FromBase64String"
#    or ScriptBlockText contains "-encodedcommand"
#    or ScriptBlockText contains "Invoke-Expression"
# | project TimeGenerated, Computer, ScriptBlockText

# --- Sigma Rules (Platform-agnostic detection) ---

# Sigma rule: Suspicious Scheduled Task Creation
# title: Suspicious Scheduled Task Creation
# status: experimental
# logsource:
#     category: process_creation
#     product: windows
# detection:
#     selection:
#         Image|endswith: '\schtasks.exe'
#         CommandLine|contains:
#             - '/create'
#             - '/sc'
#             - '/tr'
#     filter_legitimate:
#         User|contains:
#             - 'SYSTEM'
#             - 'NT AUTHORITY'
#     condition: selection and not filter_legitimate
# level: medium
# tags:
#     - attack.execution
#     - attack.t1053.005

# --- YARA Rule untuk Hunting ---

# rule CobaltStrike_Beacon {
#     meta:
#         description = "Detects Cobalt Strike Beacon"
#         author = "BeebaneLabs"
#         date = "2026-06-26"
#     strings:
#         $beacon_config = { 00 01 00 01 00 02 ?? ?? 00 02 00 01 00 02 ?? ?? }
#         $pipe = "\\\\.\\pipe\\msagent_" ascii
#         $sleep_mask = { 4C 8B 53 08 45 8B 0A 45 8B 5A 04 4D 8D 52 08 45 85 C9 }
#     condition:
#         2 of them
# }

# ===== HUNTING WORKFLOW =====

# 1. RECEIVE INTEL β†’ Threat intel feed indicates APT29 targeting
#    financial sector with custom RAT "ShadowTrojan"
#
# 2. FORM HYPOTHESIS β†’ "ShadowTrojan may be present in our
#    environment, communicating via HTTPS to specific domains"
#
# 3. CREATE DETECTIONS β†’ Write Sigma/SIEM rules for:
#    - DNS queries to known C2 domains
#    - JA3 hash matching ShadowTrojan TLS fingerprint
#    - Process creation matching ShadowTrojan behavior
#    - Registry modifications matching persistence methods
#
# 4. HUNT β†’ Execute queries across environment
#
# 5. ANALYZE β†’ Review results, validate findings
#
# 6. RESPOND β†’ If found, initiate incident response
#
# 7. IMPROVE β†’ Convert successful hunts into automated detections

7. Automation & Tooling

Kategori Tools Keterangan
TIP (Threat Intel Platform)OpenCTI, MISP, ThreatConnectManajemen & korelasi threat data
SIEMSplunk, Sentinel, Elastic, QRadarDetection & alerting
SOARShuffle SOAR, TheHive, CortexAutomated response
Malware AnalysisCuckoo, CAPE, ANY.RUNDynamic analysis sandbox
Feeds AggregatorMISP, OpenCTI, CRITsAggregate multiple feeds
Detection EngineeringSigma, YARA, Snort/SuricataCustom detection rules
Python β€” Threat Intel Automation
# ===== THREAT INTEL AUTOMATION PIPELINE =====

import requests
import json
import schedule
import time
from datetime import datetime

class ThreatIntelAutomation:
    """Automated threat intelligence collection dan distribution"""
    
    def __init__(self, config):
        self.config = config
        self.iocs = []
    
    # ===== COLLECTOR 1: MalwareBazaar =====
    def collect_malwarebazaar(self):
        """Ambil sample malware terbaru"""
        url = "https://mb-api.abuse.ch/api/v1/"
        data = {
            "query": "get_recent",
            "selector": "100"  # 100 sample terbaru
        }
        
        try:
            resp = requests.post(url, data=data)
            samples = resp.json().get('data', [])
            
            for sample in samples:
                ioc = {
                    'type': 'file_hash',
                    'value': sample.get('sha256_hash'),
                    'source': 'MalwareBazaar',
                    'malware': sample.get('signature', 'unknown'),
                    'tags': sample.get('tags', []),
                    'first_seen': sample.get('first_seen'),
                    'confidence': 80
                }
                self.iocs.append(ioc)
            
            print(f"[+] Collected {len(samples)} samples from MalwareBazaar")
        except Exception as e:
            print(f"[-] MalwareBazaar error: {e}")
    
    # ===== COLLECTOR 2: URLhaus =====
    def collect_urlhaus(self):
        """Ambil URL malware terbaru"""
        url = "https://urlhaus.abuse.ch/downloads/json_recent/"
        
        try:
            resp = resp = requests.get(url)
            data = resp.json()
            
            for entry in list(data.get('urls', []))[:100]:
                ioc = {
                    'type': 'url',
                    'value': entry.get('url'),
                    'source': 'URLhaus',
                    'threat': entry.get('threat', 'unknown'),
                    'tags': entry.get('tags', []),
                    'first_seen': entry.get('firstseen'),
                    'confidence': 75
                }
                self.iocs.append(ioc)
            
            print(f"[+] Collected URLs from URLhaus")
        except Exception as e:
            print(f"[-] URLhaus error: {e}")
    
    # ===== COLLECTOR 3: PhishTank =====
    def collect_phishtank(self):
        """Ambil URL phishing terbaru"""
        url = "http://data.phishtank.com/data/online-valid.json"
        
        try:
            resp = requests.get(url, timeout=30)
            entries = resp.json()
            
            for entry in entries[:100]:
                ioc = {
                    'type': 'url',
                    'value': entry.get('url'),
                    'source': 'PhishTank',
                    'target': entry.get('target', 'unknown'),
                    'verified': entry.get('verified', 'no'),
                    'first_seen': entry.get('verification_time'),
                    'confidence': 85 if entry.get('verified') == 'yes' else 60
                }
                self.iocs.append(ioc)
            
            print(f"[+] Collected {len(entries)} phishing URLs")
        except Exception as e:
            print(f"[-] PhishTank error: {e}")
    
    # ===== DISTRIBUTOR: Push ke SIEM =====
    def push_to_siem(self):
        """Push IoC ke SIEM untuk detection"""
        if not self.iocs:
            return
        
        print(f"\n[*] Pushing {len(self.iocs)} IoCs to SIEM...")
        
        # Contoh: Push ke Elastic via REST API
        # for ioc in self.iocs:
        #     requests.post(
        #         'http://localhost:9200/threat-intel/_doc',
        #         json=ioc,
        #         auth=('elastic', 'password')
        #     )
        
        # Contoh: Push ke MISP
        # misp = ExpandedPyMISP(misp_url, misp_key)
        # for ioc in self.iocs:
        #     misp.add_attribute(event_id, ioc)
        
        print(f"[+] Push complete!")
    
    # ===== DISTRIBUTOR: Generate Blocklists =====
    def generate_blocklist(self, min_confidence=70):
        """Generate blocklist untuk firewall/proxy"""
        high_confidence = [
            ioc for ioc in self.iocs
            if ioc.get('confidence', 0) >= min_confidence
        ]
        
        # IP blocklist
        ip_list = [i['value'] for i in high_confidence if i['type'] == 'ip']
        with open('blocklist_ips.txt', 'w') as f:
            f.write('\n'.join(ip_list))
        
        # Domain blocklist
        domain_list = [i['value'] for i in high_confidence if i['type'] == 'domain']
        with open('blocklist_domains.txt', 'w') as f:
            f.write('\n'.join(domain_list))
        
        # URL blocklist
        url_list = [i['value'] for i in high_confidence if i['type'] == 'url']
        with open('blocklist_urls.txt', 'w') as f:
            f.write('\n'.join(url_list))
        
        print(f"[+] Generated blocklists: {len(ip_list)} IPs, "
              f"{len(domain_list)} domains, {len(url_list)} URLs")
    
    def run_collection(self):
        """Jalankan semua collector"""
        print(f"\n{'='*50}")
        print(f"THREAT INTEL COLLECTION β€” {datetime.utcnow().isoformat()}")
        print(f"{'='*50}")
        
        self.iocs = []  # Reset
        self.collect_malwarebazaar()
        self.collect_urlhaus()
        self.collect_phishtank()
        self.push_to_siem()
        self.generate_blocklist()
    
    def start_scheduler(self, interval_hours=6):
        """Jalankan otomatis setiap N jam"""
        schedule.every(interval_hours).hours.do(self.run_collection)
        
        # Jalankan sekali langsung
        self.run_collection()
        
        while True:
            schedule.run_pending()
            time.sleep(60)

# ===== JALANKAN =====
# automation = ThreatIntelAutomation(config={})
# automation.start_scheduler(interval_hours=6)

8. SIEM Integration

Implementation β€” SIEM Integration
# ===== SIEM INTEGRATION WITH THREAT INTEL =====

# ===== ELASTICSEARCH / ELASTIC SIEM =====

# 1. Create threat intel index
# PUT /threat-intel-iocs
# {
#   "mappings": {
#     "properties": {
#       "indicator_type": { "type": "keyword" },
#       "indicator_value": { "type": "keyword" },
#       "source": { "type": "keyword" },
#       "confidence": { "type": "integer" },
#       "first_seen": { "type": "date" },
#       "last_seen": { "type": "date" },
#       "tags": { "type": "keyword" },
#       "malware_family": { "type": "keyword" }
#     }
#   }
# }

# 2. Enrichment pipeline
# Simpan IoC ke index β†’ Elasticsearch enrichment
# β†’ Match incoming logs β†’ Alert

# ===== SPLUNK =====

# 1. Lookup table untuk IoC
# Simpan CSV: ip, domain, hash, source, confidence
# | inputlookup threat_intel_iocs.csv
# | where indicator_type="ip"

# 2. Correlation search
# index=network sourcetype=firewall
# [| inputlookup threat_intel_iocs.csv
#   | where indicator_type="ip"
#   | fields indicator_value
#   | rename indicator_value as dest_ip]
# | stats count by src_ip, dest_ip, dest_port

# 3. Real-time alert
# index=dns sourcetype=zeek:dhcp
# [| inputlookup threat_intel_domains.csv
#   | where indicator_type="domain"
#   | fields indicator_value
#   | rename indicator_value as query]
# | table _time, src_ip, query, answers

# ===== MICROSOFT SENTINEL =====

# 1. Threat Intelligence Upload
# Sentinels β†’ Threat Intelligence β†’ Upload indicators
# Format: CSV/JSON/STIX

# 2. Analytics Rule: TI Map
# SecurityAlert
# | where TimeGenerated > ago(7d)
# | join kind=inner ThreatIntelligenceIndicator on $left.IPAddress == $right.NetworkIP
# | project TimeGenerated, AlertName, IndicatorName, ConfidenceLevel

# 3. Fusion (automatic correlation)
# Sentinel Fusion mengkorelasikan threat intel
# dengan anomali behavior secara otomatis

# ===== MISP INTEGRATION =====

# MISP sebagai Threat Intelligence Platform
# yang terintegrasi dengan SIEM

# Install MISP:
# sudo apt install misp misp-modules

# Push IoC ke MISP:
# from pymisp import ExpandedPyMISP, MISPEvent
# misp = ExpandedPyMISP(misp_url, misp_key)
# event = MISPEvent()
# event.info = "APT29 C2 Domains"
# event.distribution = 0  # Organisation only
# event.threat_level_id = 1  # High
# event.add_attribute('domain', 'c2-malware.com')
# misp.add_event(event)

# Pull dari MISP ke SIEM:
# MISP β†’ TAXII β†’ SIEM
# Atau: MISP REST API β†’ Custom script β†’ SIEM

9. Best Practices & Implementasi

πŸ“‹ Threat Intelligence Implementation Checklist
  1. Define Requirements β€” Identifikasi kebutuhan TI berdasarkan risiko industri
  2. Select Sources β€” Pilih kombinasi sumber open-source dan komersial
  3. Automate Collection β€” Otomasi pengumpulan data dari feeds
  4. Normalize Data β€” Standardisasi format (STIX 2.1)
  5. Enrich & Contextualize β€” Tambahkan konteks dari multiple sources
  6. Integrate with SIEM β€” Push IoC ke SIEM untuk real-time detection
  7. Create Detection Rules β€” Konversi TI menjadi Sigma/SIEM rules
  8. Feedback Loop β€” Evaluasi dan refine berdasarkan hasil
  9. Share Intel β€” Bergabung dengan ISAC/ISAO untuk sharing
  10. Regular Reviews β€” Review dan update TI program secara berkala

10. Quiz Pemahaman

Uji pemahaman Anda tentang Threat Intelligence:

Pertanyaan 1: Apa perbedaan antara tipe Strategic dan Technical Threat Intelligence?

a) Strategic untuk CISO (keputusan bisnis), Technical untuk SOC analyst (detection/blocking)
b) Strategic hanya tentang malware, Technical tentang semua ancaman
c) Tidak ada perbedaan
d) Strategic lebih teknis dari Technical

Pertanyaan 2: Apa fungsi utama STIX dalam Threat Intelligence?

a) Mengenkripsi data threat intel
b) Standar format untuk mendeskripsikan dan bertukar threat data secara terstruktur
c) Tools untuk scanning malware
d) Database vulnerability

Pertanyaan 3: Mengapa IoC berupa TTP (Tactics, Techniques, Procedures) memiliki "shelf life" paling lama?

a) Karena TTP tidak pernah berubah
b) Karena TTP menggambarkan perilaku yang lebih sulit diubah dibanding infrastruktur teknis
c) Karena TTP dienkripsi
d) Karena TTP hanya digunakan oleh pemerintah

Pertanyaan 4: Apa itu Threat Hunting?

a) Menjalankan antivirus otomatis
b) Proaktif mencari ancaman yang tidak terdeteksi oleh tools otomatis berdasarkan hipotesis
c) Mengumpulkan IP berbahaya dari internet
d) Monitoring log secara manual

Pertanyaan 5: Apa keuntungan menggunakan MITRE ATT&CK dalam program Threat Intelligence?

a) Menggantikan kebutuhan SIEM
b) Menyediakan bahasa umum untuk memetakan TTP, mengidentifikasi gap pertahanan, dan memprioritaskan mitigasi
c) Mengotomasi patching sistem
d) Menghapus malware otomatis
πŸ” Zoom
100%
🎨 Tema