-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfdi.py
More file actions
154 lines (131 loc) · 6.45 KB
/
fdi.py
File metadata and controls
154 lines (131 loc) · 6.45 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import tkinter as tk
from tkinter import ttk, scrolledtext, filedialog
import threading
import logging
from scapy.all import *
import psutil
import yara
import os
import time
from collections import defaultdict
from PIL import Image, ImageTk # Para manejar imágenes
class SecurityMonitor:
def __init__(self, gui):
self.gui = gui
self.attack_threshold = 100 # Umbral de paquetes por segundo
self.connection_count = defaultdict(int)
self.blocked_ips = set()
self.logger = self._setup_logger()
self.yara_rules = yara.compile('rules.yar') # Compilar reglas YARA una vez
self.scanning = False
def _setup_logger(self):
logger = logging.getLogger('SecurityMonitor')
logger.setLevel(logging.INFO)
handler = logging.FileHandler('security.log')
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def analyze_packet(self, packet):
if IP in packet:
src_ip = packet[IP].src
self.connection_count[src_ip] += 1
if self.connection_count[src_ip] > self.attack_threshold:
self.handle_potential_attack(src_ip)
def handle_potential_attack(self, ip):
if ip not in self.blocked_ips:
self.blocked_ips.add(ip)
self.logger.warning(f"Posible ataque DoS detectado desde {ip}")
self.gui.update_log(f"⚠️ Posible ataque DoS detectado desde {ip}")
self.apply_firewall_rule(ip)
def apply_firewall_rule(self, ip):
try:
if os.name == 'posix':
os.system(f'iptables -A INPUT -s {ip} -j DROP')
elif os.name == 'nt':
os.system(f'netsh advfirewall firewall add rule name="Block {ip}" dir=in action=block remoteip={ip}')
self.logger.info(f"IP {ip} bloqueada en el firewall")
self.gui.update_log(f"✅ IP {ip} bloqueada en el firewall")
except Exception as e:
self.logger.error(f"Error al aplicar regla de firewall: {e}")
self.gui.update_log(f"❌ Error al aplicar regla de firewall: {e}")
def scan_files(self, path):
try:
for root, _, files in os.walk(path):
for file in files:
full_path = os.path.join(root, file)
try:
matches = self.yara_rules.match(full_path)
if matches:
mensaje = f"⚠️ Malware detectado en {full_path} | Coincidencias: {matches}"
self.logger.warning(mensaje)
self.gui.update_log(mensaje)
except yara.Error as e:
self.gui.update_log(f"Error analizando {full_path}: {e}")
except Exception as e:
self.gui.update_log(f"Error en escaneo de archivos: {e}")
def monitor_system(self):
while True:
cpu_percent = psutil.cpu_percent()
memory_percent = psutil.virtual_memory().percent
if cpu_percent > 90 or memory_percent > 90:
self.gui.update_log(f"⚠️ Alto uso de recursos - CPU: {cpu_percent}%, Memoria: {memory_percent}%")
time.sleep(5)
def start_network_monitoring(self):
try:
self.gui.update_log("🛡️ Iniciando monitorización de red...")
sniff(prn=self.analyze_packet, store=0)
except Exception as e:
self.gui.update_log(f"❌ Error en la captura de paquetes: {e}")
class SecurityApp:
def __init__(self, root):
self.root = root
self.root.title("BlockMax - Sistema de Defensa")
self.root.configure(bg="#1e1e1e") # Fondo oscuro
self.monitor = SecurityMonitor(self)
self.scan_path = ""
# Estilos personalizados
style = ttk.Style()
style.configure("TButton", font=("Arial", 12, "bold"), padding=10)
style.configure("TLabel", font=("Arial", 14), background="#1e1e1e", foreground="white")
# Cargar y mostrar el logo
image = Image.open("logo.png")
image = image.resize((150, 150), Image.LANCZOS)
self.logo = ImageTk.PhotoImage(image)
self.logo_label = ttk.Label(root, image=self.logo, background="#1e1e1e")
self.logo_label.pack(pady=10)
self.status_label = ttk.Label(root, text="Estado: Seguro", font=("Arial", 16, "bold"), background="#1e1e1e", foreground="green")
self.status_label.pack(pady=10)
self.log_area = scrolledtext.ScrolledText(root, width=80, height=20, state='disabled', bg="#2d2d2d", fg="white", font=("Courier", 12))
self.log_area.pack(pady=10)
self.path_button = ttk.Button(root, text="Seleccionar Carpeta", command=self.select_scan_path)
self.path_button.pack(pady=5)
self.scan_button = ttk.Button(root, text="Escanear archivos", command=self.start_file_scan)
self.scan_button.pack(pady=5)
self.start_button = ttk.Button(root, text="Iniciar monitoreo", command=self.start_monitoring)
self.start_button.pack(pady=5)
self.stop_button = ttk.Button(root, text="Detener monitoreo", command=self.stop_monitoring)
self.stop_button.pack(pady=5)
def update_log(self, message):
self.log_area.config(state='normal')
self.log_area.insert(tk.END, message + "\n")
self.log_area.config(state='disabled')
self.log_area.yview(tk.END)
def select_scan_path(self):
self.scan_path = filedialog.askdirectory()
self.update_log(f"📁 Carpeta seleccionada: {self.scan_path}")
def start_file_scan(self):
if self.scan_path:
threading.Thread(target=self.monitor.scan_files, args=(self.scan_path,), daemon=True).start()
else:
self.update_log("⚠️ Por favor, selecciona una carpeta antes de escanear.")
def start_monitoring(self):
threading.Thread(target=self.monitor.start_network_monitoring, daemon=True).start()
threading.Thread(target=self.monitor.monitor_system, daemon=True).start()
self.update_log("🛡️ Monitoreo iniciado...")
def stop_monitoring(self):
self.update_log("⏹️ Monitoreo detenido...")
if __name__ == "__main__":
root = tk.Tk()
app = SecurityApp(root)
root.mainloop()