Situatie
Pentru a scrie un script complet, trebuie adoptata o abordare modulară. Scriptul se va conecta la un server de email (via IMAP), va descărca mesajele noi, va extrage link-urile și atașamentele, le va scana și va trimite rezultatul către un fișier JSON pe care Wazuh îl va “înghiți” instantaneu.
Solutie
Pasi de urmat
- Identificarea Amenințărilor: Scriptul nu se uită doar dacă e-mailul e “enervant”, ci caută indicatori de compromitere (IoC): URL-uri raportate de peste 70 de motoare de antivirus și macro-uri malițioase.
-
Fuziunea datelor: Combinăm datele de la VirusTotal (threat intel) cu Oletools (analiză locală).
-
Raportare în timp real: Rezultatul este un JSON curat. Wazuh vede acest JSON și, pe baza câmpului
threat_level, poate genera o alertă de e-mail către echipa de securitate sau poate bloca IP-ul expeditorului la nivel de firewall.
Scriptul complet spam_analyzer.py
import imaplib
import mailparser
import requests
import hashlib
import json
import os
import re
from oletools.olevba import VBA_Parser
# — CONFIGURARE —
IMAP_SERVER = ‘imap.gmail.com’
EMAIL_USER = ‘contul_tau@gmail.com’
EMAIL_PASS = ‘parola_ta_aplicatie’
VT_API_KEY = ‘CHEIA_TA_VIRUSTOTAL’
WAZUH_LOG_PATH = ‘/var/log/email_analysis.log’
# — FUNCȚII DE ANALIZĂ —
def check_vt_url(url):
“””Verifică reputația URL-ului pe VirusTotal”””
print(f”[#] Scanare URL: {url}”)
vt_url = “https://www.virustotal.com/api/v3/urls”
import base64
url_id = base64.urlsafe_b64encode(url.encode()).decode().strip(“=”)
headers = {“x-apikey”: VT_API_KEY}
response = requests.get(f”{vt_url}/{url_id}”, headers=headers)
if response.status_code == 200:
stats = response.json()[‘data’][‘attributes’][‘last_analysis_stats’]
return stats[‘malicious’] + stats[‘suspicious’]
return 0
def check_file_malware(file_content, filename):
“””Analiză statică și verificare Hash pe VirusTotal”””
file_hash = hashlib.sha256(file_content).hexdigest()
headers = {“x-apikey”: VT_API_KEY}
# 1. Verificare Hash
res = requests.get(f”https://www.virustotal.com/api/v3/files/{file_hash}”, headers=headers)
vt_score = 0
if res.status_code == 200:
stats = res.json()[‘data’][‘attributes’][‘last_analysis_stats’]
vt_score = stats[‘malicious’]
# 2. Verificare Macro (pentru doc, xls)
is_macro_dangerous = False
if filename.endswith((‘.doc’, ‘.xls’, ‘.docm’, ‘.xlsm’)):
vba = VBA_Parser(filename=filename, data=file_content)
if vba.detect_vba_macros():
is_macro_dangerous = True
return {“vt_score”: vt_score, “has_macros”: is_macro_dangerous}
# — PROCESARE EMAIL —
def process_emails():
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_USER, EMAIL_PASS)
mail.select(‘inbox’)
# Căutăm email-uri necitite
_, data = mail.search(None, ‘UNSEEN’)
for num in data[0].split():
_, msg_data = mail.fetch(num, ‘(RFC822)’)
raw_email = msg_data[0][1]
email = mailparser.parse_from_bytes(raw_email)
report = {
“integration”: “email_spam_analysis”,
“from”: email.from_[0][1],
“subject”: email.subject,
“urls_detected”: [],
“attachments_detected”: [],
“threat_level”: “low”
}
# 1. Analiză URL-uri
urls = re.findall(r'(https?://[^\s<>”]+|www\.[^\s<>”]+)’, email.body)
for u in list(set(urls)):
score = check_vt_url(u)
if score > 0:
report[“urls_detected”].append({“url”: u, “malicious_score”: score})
report[“threat_level”] = “high”
# 2. Analiză Atașamente
for att in email.attachments:
analysis = check_file_malware(att[‘payload’], att[‘filename’])
report[“attachments_detected”].append({
“filename”: att[‘filename’],
“vt_malicious”: analysis[‘vt_score’],
“macros”: analysis[‘has_macros’]
})
if analysis[‘vt_score’] > 0 or analysis[‘has_macros’]:
report[“threat_level”] = “critical”
# Trimitem rezultatul către log-ul monitorizat de Wazuh
with open(WAZUH_LOG_PATH, ‘a’) as f:
f.write(json.dumps(report) + ‘\n’)
print(f”[!] Analiză finalizată pentru: {email.subject}”)
if __name__ == “__main__”:
process_emails()
Leave A Comment?