1. Pengenalan Threat Intelligence
Cyber Threat Intelligence (CTI) adalah informasi yang dikumpulkan, dianalisis, dan diinterpretasikan tentang ancaman siber yang sudah ada, sedang berlangsung, atau yang akan datang. CTI membantu organisasi memahami siapa yang menyerang, mengapa, bagaimana, dan apa yang bisa dilakukan untuk melindungi diri.
Threat Intelligence bukan sekadar daftar IP dan domain berbahaya. CTI yang baik memberikan konteks yang memungkinkan pengambilan keputusan keamanan yang lebih tepat β dari strategi investasi keamanan hingga prioritas patch dan konfigurasi.
Tipe Threat Intelligence
| Tipe | Audiens | Contoh | Fungsi |
|---|---|---|---|
| Strategic | CISO, Direksi | Laporan tren ancaman, risiko industri | Pengambilan keputusan bisnis & investasi keamanan |
| Tactical | Security Architect | TTP adversary, MITRE ATT&CK mapping | Desain pertahanan dan arsitektur keamanan |
| Operational | Incident Response | Kampanye serangan aktif, target industri | Respons insiden dan threat hunting |
| Technical | SOC Analyst | IoC (IP, domain, hash), signatures | Detection, blocking, dan monitoring real-time |
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ β THREAT INTELLIGENCE LIFECYCLE β β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β1. PLAN ββββΆβ2. COLLECTββββΆβ3. PROCESSβ β β β β β β β β β β ββ’ Requirementsββ’ OSINT β ββ’ Normalizeβ β β ββ’ Priority β ββ’ Feeds β ββ’ Filter β β β ββ’ Scope β ββ’ Dark webβ ββ’ Dedupe β β β ββ’ Stake- β ββ’ Honeypotβ ββ’ Enrich β β β β holders β ββ’ ISAC β β β β β ββββββββββββ ββββββββββββ ββββββββββββ β β β² β β β β βΌ β β ββββββββββββ ββββββββββββ ββββββββββββ β β β6. FEEDBACKβββββ5. DISTRIBβββββ4. ANALYZEβ β β β β β β β β β β ββ’ Evaluateβ ββ’ Alerts β ββ’ Context β β β ββ’ Improve β ββ’ Reports β ββ’ Correlate β β ββ’ Update β ββ’ SIEM β ββ’ Attributeβ β β β require-β ββ’ TIP β ββ’ Predict β β β β ments β ββ’ Block β β β β β ββββββββββββ ββββββββββββ ββββββββββββ β ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
2. Framework & Model
2.1 MITRE ATT&CK
MITRE ATT&CK adalah framework global yang mendokumentasikan TTP (Tactics, Techniques, and Procedures) adversary berdasarkan observasi dunia nyata. Framework ini menjadi standar industri untuk berkomunikasi tentang ancaman.
# ===== MITRE ATT&CK STRUCTURE ===== # ATT&CK menyusun pengetahuan tentang serangan dalam: # - 14 Tactics (tujuan/tahap serangan) # - 200+ Techniques (cara mencapai tujuan) # - 400+ Sub-techniques (variasi teknik) # - 800+ Procedures (implementasi spesifik) # ===== ENTERPRISE TACTICS ===== # TA0043 Reconnaissance β Mengumpulkan info target # TA0042 Resource Development β Menyiapkan infrastruktur # TA0001 Initial Access β Mendapatkan akses awal # TA0002 Execution β Menjalankan kode # TA0003 Persistence β Mempertahankan akses # TA0004 Privilege Escalation β Meningkatkan hak akses # TA0005 Defense Evasion β Menghindari deteksi # TA0006 Credential Access β Mencuri kredensial # TA0007 Discovery β Menjelajahi lingkungan # TA0008 Lateral Movement β Bergerak ke sistem lain # TA0009 Collection β Mengumpulkan data target # TA0011 Command and Control β Komunikasi dengan C2 # TA0010 Exfiltration β Mencuri data keluar # TA0040 Impact β Merusak atau memanipulasi # ===== CONTOH MAPPING ===== # APT29 (Cozy Bear) β TTP yang terdokumentasi: # TA0001: T1566.002 (Spearphishing Link) # TA0002: T1059.001 (PowerShell) # TA0003: T1547.001 (Registry Run Keys) # TA0005: T1027 (Obfuscated Files) # TA0006: T1003.001 (LSASS Memory) # TA0008: T1021.002 (SMB/Windows Admin Shares) # TA0011: T1071.001 (Web Protocols) # ===== MENGGUNAKAN ATT&CK MATRIX ===== # 1. Identifikasi threat actors yang relevan untuk industri Anda # 2. Mapping TTP mereka ke ATT&CK matrix # 3. Identifikasi gap dalam pertahanan Anda # 4. Prioritaskan mitigasi berdasarkan risk # Online: https://attack.mitre.org/matrices/enterprise/ # Navigator: https://mitre-attack.github.io/attack-navigator/
2.2 Diamond Model & Kill Chain
# ===== CYBER KILL CHAIN (Lockheed Martin) ===== # 7 tahap serangan: # 1. Reconnaissance β Target identification # 2. Weaponization β Create malicious payload # 3. Delivery β Send to victim (email, web, USB) # 4. Exploitation β Trigger vulnerability # 5. Installation β Install malware/backdoor # 6. Command & Control β Establish remote access # 7. Actions on Obj. β Achieve attacker goal # Setiap tahap adalah kesempatan untuk DETEKSI dan PENCEGAHAN # ===== DIAMOND MODEL ===== # Model analisis yang menghubungkan 4 elemen: # # Adversary ββββββββββ Capability # β β # β β # Infrastructure ββββββ Victim # # Setiap event serangan memiliki: # - Adversary: Siapa yang menyerang # - Capability: Tools/malware yang digunakan # - Infrastructure: Server/domain C2 # - Victim: Target yang diserang # # Dengan menganalisis hubungan ini, kita bisa: # - Mengatribusi serangan ke kelompok tertentu # - Memprediksi langkah selanjutnya # - Menghubungkan beberapa insiden # ===== CONTOH ANALISIS ===== # Event: Phishing email terdeteksi # # Adversary: APT group "XYZ" # β TTP: Spearphishing dengan dokumen Office macro # β Motivation: Espionage (intellectual property theft) # # Capability: Custom RAT "ShadowTrojan" # β SHA256: abc123... # β C2 protocol: HTTPS dengan custom encryption # # Infrastructure: # β C2 domain: updates-service[.]com # β C2 IP: 185.x.x.x (hosted di bulletproof hosting) # β Redirector: Cloudflare Workers # # Victim: # β Target: R&D department # β Entry point: email user@target.com # β Compromised host: WORKSTATION-042
3. Sumber Data Threat Intelligence
# ===== SUMBER DATA THREAT INTELLIGENCE =====
# ===== 1. OPEN SOURCE INTELLIGENCE (OSINT) =====
# VirusTotal β Analisis file dan URL
# https://www.virustotal.com
# API: cek hash, domain, IP, URL
curl "https://www.virustotal.com/api/v3/ip_addresses/1.2.3.4" \
-H "x-apikey: YOUR_API_KEY"
# AlienVault OTX β Open Threat Exchange
# https://otx.alienvault.com
# Community-driven threat intelligence
# API endpoint: https://otx.alienvault.com/api/v1/indicators/
# AbuseIPDB β Database IP berbahaya
# https://www.abuseipdb.com
curl "https://api.abuseipdb.com/api/v2/check?ipAddress=1.2.3.4" \
-H "Key: YOUR_API_KEY" \
-H "Accept: application/json"
# URLhaus β Database URL malware
# https://urlhaus.abuse.ch/api/
# MalwareBazaar β Database sample malware
# https://bazaar.abuse.ch/api/
# PhishTank β Database phishing URLs
# https://phishtank.org/developer_info.php
# Shodan β Internet-connected device search
# https://www.shodan.io
# Cari exposed services, vulnerable devices
# Censys β Internet-wide scanning data
# https://search.censys.io
# ===== 2. COMMUNITY & GOVERNMENT SOURCES =====
# ID-SIRTII/CC (Indonesia)
# https://idsirtii.or.id
# National CSIRT Indonesia β alert dan advisory
# CISA (US Cybersecurity & Infrastructure)
# https://www.cisa.gov/cybersecurity-advisories
# Known Exploited Vulnerabilities (KEV) catalog
# MITRE ATT&CK
# https://attack.mitre.org
# TTP database dan mapping
# FIRST β Forum of Incident Response Teams
# https://www.first.org
# ===== 3. COMMERCIAL THREAT INTELLIGENCE =====
# Recorded Future β AI-powered threat intelligence
# Mandiant Advantage β Threat intelligence platform
# CrowdStrike Falcon β Threat intelligence feed
# Palo Alto Unit 42 β Research & threat intel
# Cisco Talos β Threat intelligence team
# Microsoft Threat Intelligence β MSTIC
# ===== 4. ISAC (Information Sharing and Analysis Center) =====
# FS-ISAC β Financial Services
# H-ISAC β Healthcare
# IT-ISAC β Information Technology
# E-ISAC β Energy
# MS-ISAC β State/Local Government
# Aviation ISAC, Retail ISAC, dll.
# ===== 5. DARK WEB MONITORING =====
# Monitor dark web forum, marketplace, dan paste sites
# untuk data breach, credential leaks, zero-day sales
# Tools: DarkOwl, Flashpoint, Recorded Future
4. Indikator Kompromi (IoC)
Indicators of Compromise (IoC) adalah artefak teknis yang menunjukkan adanya intrusi atau aktivitas berbahaya. IoC adalah output paling langsung dari threat intelligence yang digunakan oleh SOC untuk detection dan blocking.
4.1 Jenis-Jenis IoC
| Jenis IoC | Contoh | Confidence | Shelf Life |
|---|---|---|---|
| IP Address | 185.220.100.252 | Medium | Pendek (berubah cepat) |
| Domain | c2-malware[.]com | Medium | PendekβSedang |
| URL | hxxps://c2[.]com/payload.dll | MediumβHigh | Pendek |
| File Hash | SHA256: a1b2c3... | High | Permanen (tapi bisa diubah) |
| Email Address | attacker@phish.com | Medium | Sedang |
| YARA Rule | Pola byte malware | High | Lama |
| JA3/JA3S Hash | TLS fingerprint | High | Lama |
| Registry Key | HKCU\Software\Malware | High | Lama |
| Mutex | Global\MalwareMutex | High | Lama |
| TTP | Technique behavior | Highest | Paling lama |
# ===== IOC COLLECTION & ENRICHMENT =====
import requests
import hashlib
import json
from datetime import datetime
class ThreatIntelCollector:
"""Collector untuk mengumpulkan dan enrich IoC"""
def __init__(self, vt_api_key=None, otx_key=None, abuse_key=None):
self.vt_key = vt_api_key
self.otx_key = otx_key
self.abuse_key = abuse_key
# ===== VIRUSTOTAL =====
def check_virustotal(self, indicator, indicator_type='ip'):
"""Cek IoC di VirusTotal"""
if not self.vt_key:
return None
headers = {'x-apikey': self.vt_key}
base = 'https://www.virustotal.com/api/v3'
endpoints = {
'ip': f'{base}/ip_addresses/{indicator}',
'domain': f'{base}/domains/{indicator}',
'hash': f'{base}/files/{indicator}',
'url': f'{base}/urls/{indicator}'
}
try:
resp = requests.get(
endpoints.get(indicator_type, endpoints['ip']),
headers=headers
)
data = resp.json()
if 'data' in data:
attrs = data['data']['attributes']
return {
'source': 'VirusTotal',
'indicator': indicator,
'type': indicator_type,
'malicious': attrs.get('last_analysis_stats', {}).get('malicious', 0),
'suspicious': attrs.get('last_analysis_stats', {}).get('suspicious', 0),
'harmless': attrs.get('last_analysis_stats', {}).get('harmless', 0),
'reputation': attrs.get('reputation', 0),
'tags': attrs.get('tags', []),
'checked_at': datetime.utcnow().isoformat()
}
except Exception as e:
return {'error': str(e)}
# ===== ALIENVAULT OTX =====
def check_otx(self, indicator, indicator_type='IPv4'):
"""Cek IoC di AlienVault OTX"""
if not self.otx_key:
return None
headers = {'X-OTX-API-KEY': self.otx_key}
base = 'https://otx.alienvault.com/api/v1/indicators'
endpoints = {
'IPv4': f'{base}/IPv4/{indicator}/general',
'domain': f'{base}/domain/{indicator}/general',
'file': f'{base}/file/{indicator}/general',
'URL': f'{base}/url/{indicator}/general'
}
try:
resp = requests.get(
endpoints.get(indicator_type, endpoints['IPv4']),
headers=headers
)
data = resp.json()
return {
'source': 'OTX',
'indicator': indicator,
'pulse_count': data.get('pulse_info', {}).get('count', 0),
'reputation': data.get('reputation', 0),
'country': data.get('country_name', 'Unknown'),
'checked_at': datetime.utcnow().isoformat()
}
except Exception as e:
return {'error': str(e)}
# ===== ABUSEIPDB =====
def check_abuseipdb(self, ip):
"""Cek IP di AbuseIPDB"""
if not self.abuse_key:
return None
headers = {
'Key': self.abuse_key,
'Accept': 'application/json'
}
try:
resp = requests.get(
'https://api.abuseipdb.com/api/v2/check',
headers=headers,
params={'ipAddress': ip, 'maxAgeInDays': 90}
)
data = resp.json().get('data', {})
return {
'source': 'AbuseIPDB',
'indicator': ip,
'abuse_confidence_score': data.get('abuseConfidenceScore', 0),
'total_reports': data.get('totalReports', 0),
'isp': data.get('isp', 'Unknown'),
'country': data.get('countryCode', 'Unknown'),
'is_tor': data.get('isTor', False),
'checked_at': datetime.utcnow().isoformat()
}
except Exception as e:
return {'error': str(e)}
def enrich_ioc(self, indicator, indicator_type='ip'):
"""Enrich IoC dari multiple sources"""
results = {
'indicator': indicator,
'type': indicator_type,
'enrichments': [],
'risk_score': 0,
'analyzed_at': datetime.utcnow().isoformat()
}
# Query semua source
if indicator_type == 'ip':
vt = self.check_virustotal(indicator, 'ip')
otx = self.check_otx(indicator, 'IPv4')
abuse = self.check_abuseipdb(indicator)
if vt: results['enrichments'].append(vt)
if otx: results['enrichments'].append(otx)
if abuse: results['enrichments'].append(abuse)
# Hitung aggregate risk score
scores = []
for e in results['enrichments']:
if 'malicious' in e:
scores.append(min(e['malicious'] * 2, 100))
if 'abuse_confidence_score' in e:
scores.append(e['abuse_confidence_score'])
if 'pulse_count' in e and e['pulse_count'] > 0:
scores.append(min(e['pulse_count'] * 10, 100))
results['risk_score'] = int(sum(scores) / max(len(scores), 1))
return results
# ===== USAGE =====
# collector = ThreatIntelCollector(
# vt_api_key='YOUR_VT_KEY',
# otx_key='YOUR_OTX_KEY',
# abuse_key='YOUR_ABUSE_KEY'
# )
#
# # Enrich IP address
# result = collector.enrich_ioc('185.220.100.252', 'ip')
# print(json.dumps(result, indent=2))
#
# # Enrich file hash
# result = collector.enrich_ioc('abc123...', 'hash')
# print(json.dumps(result, indent=2))
5. STIX/TAXII & Data Sharing
# ===== STIX (Structured Threat Information eXpression) =====
# STIX adalah standar untuk mendeskripsikan threat intelligence
# dalam format JSON yang terstruktur dan dapat dipertukarkan
# STIX 2.1 Domain Objects:
# - Indicator: Pola untuk mendeteksi malicious activity
# - Malware: Deskripsi malware
# - Attack Pattern: Deskripsi teknik serangan
# - Threat Actor: Kelompok/individu penyerang
# - Campaign: Kampanye serangan yang terorganisir
# - Infrastructure: Infrastruktur yang digunakan attacker
# - Vulnerability: Kerentanan yang dieksploitasi
# - Identity: Organisasi atau individu
# - Report: Laporan threat intelligence
# - Sighting: Observasi indicator di lingkungan
# ===== CONTOH STIX INDICATOR =====
# {
# "type": "indicator",
# "spec_version": "2.1",
# "id": "indicator--a1b2c3d4-e5f6-7890-abcd-ef1234567890",
# "created": "2026-06-26T00:00:00.000Z",
# "modified": "2026-06-26T00:00:00.000Z",
# "name": "APT29 C2 Domain",
# "description": "Command and control domain used by APT29",
# "indicator_types": ["malicious-activity"],
# "pattern": "[domain-name:value = 'c2-apt29.example.com']",
# "pattern_type": "stix",
# "valid_from": "2026-06-26T00:00:00.000Z",
# "kill_chain_phases": [
# {
# "kill_chain_name": "mitre-attack",
# "phase_name": "command-and-control"
# }
# ]
# }
# ===== CONTOH STIX MALWARE =====
# {
# "type": "malware",
# "spec_version": "2.1",
# "id": "malware--b2c3d4e5-f6a7-8901-bcde-f12345678901",
# "created": "2026-06-26T00:00:00.000Z",
# "name": "ShadowTrojan",
# "malware_types": ["remote-access-trojan"],
# "is_family": true,
# "description": "Custom RAT used by APT29 for espionage",
# "kill_chain_phases": [
# {
# "kill_chain_name": "mitre-attack",
# "phase_name": "execution"
# }
# ]
# }
# ===== TAXII (Trusted Automated eXchange of Intelligence) =====
# TAXII adalah protokol untuk bertukar STIX data secara otomatis
# TAXII 2.1 mendefinisikan 3 services:
# 1. Collections β Kumpulan STIX objects
# 2. Channels β Pub/Sub messaging
# 3. Inbox β Menerima STIX objects
# ===== TAXII SERVER (OpenCTI, MISP, dll) =====
# Menerima data dari TAXII server:
# GET /taxii2/collections/ β List available collections
# GET /taxii2/collections/{id}/objects/ β Get STIX objects
# POST /taxii2/collections/{id}/objects/ β Add STIX objects
# Python TAXII client:
# pip install taxii2-client stix2
from taxii2client.v21 import Collection, Server
# Connect to TAXII server
server = Server("https://taxii.example.com/taxii2/")
# List collections
api_root = server.api_roots[0]
for collection in api_root.collections:
print(f"Collection: {collection.title}")
print(f" Description: {collection.description}")
print(f" Can Read: {collection.can_read}")
print(f" Can Write: {collection.can_write}")
# Get objects from collection
collection = Collection(
"https://taxii.example.com/taxii2/collections/malware-indicators/",
user="api_user",
password="api_pass"
)
# Get STIX objects
objects = collection.get_objects()
for obj in objects.get('objects', []):
print(f"Type: {obj['type']}, Name: {obj.get('name', 'N/A')}")
6. Threat Hunting
# ===== THREAT HUNTING =====
# Threat Hunting adalah proaktif mencari ancaman yang
# TIDAK terdeteksi oleh tools otomatis
# HYPOTHESIS-DRIVEN HUNTING:
# 1. Buat hipotesis berdasarkan threat intel
# 2. Buat query/detection untuk membuktikan/menolak
# 3. Analisis hasil
# 4. Refine dan iterate
# ===== CONTOH HUNTING QUERIES =====
# --- KQL (Kusto Query Language) untuk Microsoft Sentinel ---
# 1. Cari anomali login (unusual source country)
# SigninLogs
# | where TimeGenerated > ago(7d)
# | summarize count() by Location, UserPrincipalName
# | where count_ < 5 # Login dari lokasi langka
# | order by count_ asc
# 2. Cari DNS tunneling
# DnsEvents
# | where TimeGenerated > ago(24h)
# | where QueryType == "A"
# | extend DomainLen = strlen(Name)
# | where DomainLen > 50 # Subdomain sangat panjang
# | summarize count() by ClientIP, Name
# | where count_ > 100
# | order by DomainLen desc
# 3. Cari encoded PowerShell commands
# Event
# | where EventID == 4104
# | where ScriptBlockText contains "FromBase64String"
# or ScriptBlockText contains "-encodedcommand"
# or ScriptBlockText contains "Invoke-Expression"
# | project TimeGenerated, Computer, ScriptBlockText
# --- Sigma Rules (Platform-agnostic detection) ---
# Sigma rule: Suspicious Scheduled Task Creation
# title: Suspicious Scheduled Task Creation
# status: experimental
# logsource:
# category: process_creation
# product: windows
# detection:
# selection:
# Image|endswith: '\schtasks.exe'
# CommandLine|contains:
# - '/create'
# - '/sc'
# - '/tr'
# filter_legitimate:
# User|contains:
# - 'SYSTEM'
# - 'NT AUTHORITY'
# condition: selection and not filter_legitimate
# level: medium
# tags:
# - attack.execution
# - attack.t1053.005
# --- YARA Rule untuk Hunting ---
# rule CobaltStrike_Beacon {
# meta:
# description = "Detects Cobalt Strike Beacon"
# author = "BeebaneLabs"
# date = "2026-06-26"
# strings:
# $beacon_config = { 00 01 00 01 00 02 ?? ?? 00 02 00 01 00 02 ?? ?? }
# $pipe = "\\\\.\\pipe\\msagent_" ascii
# $sleep_mask = { 4C 8B 53 08 45 8B 0A 45 8B 5A 04 4D 8D 52 08 45 85 C9 }
# condition:
# 2 of them
# }
# ===== HUNTING WORKFLOW =====
# 1. RECEIVE INTEL β Threat intel feed indicates APT29 targeting
# financial sector with custom RAT "ShadowTrojan"
#
# 2. FORM HYPOTHESIS β "ShadowTrojan may be present in our
# environment, communicating via HTTPS to specific domains"
#
# 3. CREATE DETECTIONS β Write Sigma/SIEM rules for:
# - DNS queries to known C2 domains
# - JA3 hash matching ShadowTrojan TLS fingerprint
# - Process creation matching ShadowTrojan behavior
# - Registry modifications matching persistence methods
#
# 4. HUNT β Execute queries across environment
#
# 5. ANALYZE β Review results, validate findings
#
# 6. RESPOND β If found, initiate incident response
#
# 7. IMPROVE β Convert successful hunts into automated detections
7. Automation & Tooling
| Kategori | Tools | Keterangan |
|---|---|---|
| TIP (Threat Intel Platform) | OpenCTI, MISP, ThreatConnect | Manajemen & korelasi threat data |
| SIEM | Splunk, Sentinel, Elastic, QRadar | Detection & alerting |
| SOAR | Shuffle SOAR, TheHive, Cortex | Automated response |
| Malware Analysis | Cuckoo, CAPE, ANY.RUN | Dynamic analysis sandbox |
| Feeds Aggregator | MISP, OpenCTI, CRITs | Aggregate multiple feeds |
| Detection Engineering | Sigma, YARA, Snort/Suricata | Custom detection rules |
# ===== THREAT INTEL AUTOMATION PIPELINE =====
import requests
import json
import schedule
import time
from datetime import datetime
class ThreatIntelAutomation:
"""Automated threat intelligence collection dan distribution"""
def __init__(self, config):
self.config = config
self.iocs = []
# ===== COLLECTOR 1: MalwareBazaar =====
def collect_malwarebazaar(self):
"""Ambil sample malware terbaru"""
url = "https://mb-api.abuse.ch/api/v1/"
data = {
"query": "get_recent",
"selector": "100" # 100 sample terbaru
}
try:
resp = requests.post(url, data=data)
samples = resp.json().get('data', [])
for sample in samples:
ioc = {
'type': 'file_hash',
'value': sample.get('sha256_hash'),
'source': 'MalwareBazaar',
'malware': sample.get('signature', 'unknown'),
'tags': sample.get('tags', []),
'first_seen': sample.get('first_seen'),
'confidence': 80
}
self.iocs.append(ioc)
print(f"[+] Collected {len(samples)} samples from MalwareBazaar")
except Exception as e:
print(f"[-] MalwareBazaar error: {e}")
# ===== COLLECTOR 2: URLhaus =====
def collect_urlhaus(self):
"""Ambil URL malware terbaru"""
url = "https://urlhaus.abuse.ch/downloads/json_recent/"
try:
resp = resp = requests.get(url)
data = resp.json()
for entry in list(data.get('urls', []))[:100]:
ioc = {
'type': 'url',
'value': entry.get('url'),
'source': 'URLhaus',
'threat': entry.get('threat', 'unknown'),
'tags': entry.get('tags', []),
'first_seen': entry.get('firstseen'),
'confidence': 75
}
self.iocs.append(ioc)
print(f"[+] Collected URLs from URLhaus")
except Exception as e:
print(f"[-] URLhaus error: {e}")
# ===== COLLECTOR 3: PhishTank =====
def collect_phishtank(self):
"""Ambil URL phishing terbaru"""
url = "http://data.phishtank.com/data/online-valid.json"
try:
resp = requests.get(url, timeout=30)
entries = resp.json()
for entry in entries[:100]:
ioc = {
'type': 'url',
'value': entry.get('url'),
'source': 'PhishTank',
'target': entry.get('target', 'unknown'),
'verified': entry.get('verified', 'no'),
'first_seen': entry.get('verification_time'),
'confidence': 85 if entry.get('verified') == 'yes' else 60
}
self.iocs.append(ioc)
print(f"[+] Collected {len(entries)} phishing URLs")
except Exception as e:
print(f"[-] PhishTank error: {e}")
# ===== DISTRIBUTOR: Push ke SIEM =====
def push_to_siem(self):
"""Push IoC ke SIEM untuk detection"""
if not self.iocs:
return
print(f"\n[*] Pushing {len(self.iocs)} IoCs to SIEM...")
# Contoh: Push ke Elastic via REST API
# for ioc in self.iocs:
# requests.post(
# 'http://localhost:9200/threat-intel/_doc',
# json=ioc,
# auth=('elastic', 'password')
# )
# Contoh: Push ke MISP
# misp = ExpandedPyMISP(misp_url, misp_key)
# for ioc in self.iocs:
# misp.add_attribute(event_id, ioc)
print(f"[+] Push complete!")
# ===== DISTRIBUTOR: Generate Blocklists =====
def generate_blocklist(self, min_confidence=70):
"""Generate blocklist untuk firewall/proxy"""
high_confidence = [
ioc for ioc in self.iocs
if ioc.get('confidence', 0) >= min_confidence
]
# IP blocklist
ip_list = [i['value'] for i in high_confidence if i['type'] == 'ip']
with open('blocklist_ips.txt', 'w') as f:
f.write('\n'.join(ip_list))
# Domain blocklist
domain_list = [i['value'] for i in high_confidence if i['type'] == 'domain']
with open('blocklist_domains.txt', 'w') as f:
f.write('\n'.join(domain_list))
# URL blocklist
url_list = [i['value'] for i in high_confidence if i['type'] == 'url']
with open('blocklist_urls.txt', 'w') as f:
f.write('\n'.join(url_list))
print(f"[+] Generated blocklists: {len(ip_list)} IPs, "
f"{len(domain_list)} domains, {len(url_list)} URLs")
def run_collection(self):
"""Jalankan semua collector"""
print(f"\n{'='*50}")
print(f"THREAT INTEL COLLECTION β {datetime.utcnow().isoformat()}")
print(f"{'='*50}")
self.iocs = [] # Reset
self.collect_malwarebazaar()
self.collect_urlhaus()
self.collect_phishtank()
self.push_to_siem()
self.generate_blocklist()
def start_scheduler(self, interval_hours=6):
"""Jalankan otomatis setiap N jam"""
schedule.every(interval_hours).hours.do(self.run_collection)
# Jalankan sekali langsung
self.run_collection()
while True:
schedule.run_pending()
time.sleep(60)
# ===== JALANKAN =====
# automation = ThreatIntelAutomation(config={})
# automation.start_scheduler(interval_hours=6)
8. SIEM Integration
# ===== SIEM INTEGRATION WITH THREAT INTEL =====
# ===== ELASTICSEARCH / ELASTIC SIEM =====
# 1. Create threat intel index
# PUT /threat-intel-iocs
# {
# "mappings": {
# "properties": {
# "indicator_type": { "type": "keyword" },
# "indicator_value": { "type": "keyword" },
# "source": { "type": "keyword" },
# "confidence": { "type": "integer" },
# "first_seen": { "type": "date" },
# "last_seen": { "type": "date" },
# "tags": { "type": "keyword" },
# "malware_family": { "type": "keyword" }
# }
# }
# }
# 2. Enrichment pipeline
# Simpan IoC ke index β Elasticsearch enrichment
# β Match incoming logs β Alert
# ===== SPLUNK =====
# 1. Lookup table untuk IoC
# Simpan CSV: ip, domain, hash, source, confidence
# | inputlookup threat_intel_iocs.csv
# | where indicator_type="ip"
# 2. Correlation search
# index=network sourcetype=firewall
# [| inputlookup threat_intel_iocs.csv
# | where indicator_type="ip"
# | fields indicator_value
# | rename indicator_value as dest_ip]
# | stats count by src_ip, dest_ip, dest_port
# 3. Real-time alert
# index=dns sourcetype=zeek:dhcp
# [| inputlookup threat_intel_domains.csv
# | where indicator_type="domain"
# | fields indicator_value
# | rename indicator_value as query]
# | table _time, src_ip, query, answers
# ===== MICROSOFT SENTINEL =====
# 1. Threat Intelligence Upload
# Sentinels β Threat Intelligence β Upload indicators
# Format: CSV/JSON/STIX
# 2. Analytics Rule: TI Map
# SecurityAlert
# | where TimeGenerated > ago(7d)
# | join kind=inner ThreatIntelligenceIndicator on $left.IPAddress == $right.NetworkIP
# | project TimeGenerated, AlertName, IndicatorName, ConfidenceLevel
# 3. Fusion (automatic correlation)
# Sentinel Fusion mengkorelasikan threat intel
# dengan anomali behavior secara otomatis
# ===== MISP INTEGRATION =====
# MISP sebagai Threat Intelligence Platform
# yang terintegrasi dengan SIEM
# Install MISP:
# sudo apt install misp misp-modules
# Push IoC ke MISP:
# from pymisp import ExpandedPyMISP, MISPEvent
# misp = ExpandedPyMISP(misp_url, misp_key)
# event = MISPEvent()
# event.info = "APT29 C2 Domains"
# event.distribution = 0 # Organisation only
# event.threat_level_id = 1 # High
# event.add_attribute('domain', 'c2-malware.com')
# misp.add_event(event)
# Pull dari MISP ke SIEM:
# MISP β TAXII β SIEM
# Atau: MISP REST API β Custom script β SIEM
9. Best Practices & Implementasi
- Define Requirements β Identifikasi kebutuhan TI berdasarkan risiko industri
- Select Sources β Pilih kombinasi sumber open-source dan komersial
- Automate Collection β Otomasi pengumpulan data dari feeds
- Normalize Data β Standardisasi format (STIX 2.1)
- Enrich & Contextualize β Tambahkan konteks dari multiple sources
- Integrate with SIEM β Push IoC ke SIEM untuk real-time detection
- Create Detection Rules β Konversi TI menjadi Sigma/SIEM rules
- Feedback Loop β Evaluasi dan refine berdasarkan hasil
- Share Intel β Bergabung dengan ISAC/ISAO untuk sharing
- Regular Reviews β Review dan update TI program secara berkala
10. Quiz Pemahaman
Uji pemahaman Anda tentang Threat Intelligence: