-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapi.py
More file actions
144 lines (119 loc) · 5.02 KB
/
api.py
File metadata and controls
144 lines (119 loc) · 5.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import ipaddress
import subprocess
import asyncio
import threading
import time
import json
import os
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from main_server.analyze_logs import Analyze_Logs
from main_server.llm_interaction import LLMInteraction
from connectionmanager import ConnectionManager
online_ips_cache = []
def scan_lan(network="10.29.60.0/24"):
global online_ips_cache
while True:
net = ipaddress.ip_network(network, strict=False)
temp_online = []
for ip in net.hosts():
result = subprocess.run(['ping', '-c', '1', '-W', '1', str(ip)],
stdout=subprocess.DEVNULL)
if result.returncode == 0:
temp_online.append(str(ip))
online_ips_cache = temp_online
time.sleep(1)
threading.Thread(target=scan_lan, daemon=True).start()
class FileModified(BaseModel):
file_name : str
new_info : str
app = FastAPI()
llm = LLMInteraction()
conn_manager = ConnectionManager()
dict = {}
@app.get("/")
async def read_root():
return {"Hello": "World"}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await conn_manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f"Received from client: {data}")
except WebSocketDisconnect:
conn_manager.disconnect(websocket)
print("Client disconnected")
@app.post("/notify_file")
async def notify_file(file: FileModified):
file_basename = os.path.basename(file.file_name)
if file_basename == "access.log":
analizer = Analyze_Logs()
analysis_result = analizer.analyze_logs(file.new_info)
print(f"File modified! {file.file_name}\nContent: {file.new_info}")
if analysis_result.get_ip() in dict:
dict[analysis_result.get_ip()].update(analysis_result.last_request)
else:
dict[analysis_result.get_ip()] = analysis_result
llm_response = None
if analysis_result.get_matched_rules() != []:
llm_response = llm.ask_llm(analysis_result.to_str())
print(f"LLM Response: {llm_response}")
if dict[analysis_result.get_ip()].avg_rate > 3:
dict[analysis_result.get_ip()].add_matched_rules("High request rate detected")
llm_response = llm.ask_llm(dict[analysis_result.get_ip()].to_str())
print(f"LLM Response: {llm_response}")
dict[analysis_result.get_ip()].avg_rate = 0
await conn_manager.broadcast({
"type": "log",
"ip_address": analysis_result.ip_address,
"request_url": analysis_result.requested_url,
"matched_rules": len(analysis_result.matched_rules),
})
llm_response = llm.ask_llm(analysis_result.to_str())
await conn_manager.broadcast({
"type": "threat analysis",
"alert_title": llm_response['alert_title'],
"alert_description": llm_response['alert_description'],
"severity_level": llm_response['severity_level'],
"recommended": llm_response['recommended']
})
elif file_basename == "eve.json":
try:
eve_event = json.loads(file.new_info)
event_type = eve_event.get('event_type')
timestamp = eve_event.get('timestamp')
if event_type == 'flow':
flow_data = eve_event.get('flow', {})
await conn_manager.broadcast({
"type": "network_flow",
"timestamp": timestamp,
"protocol": eve_event.get('app_proto', eve_event.get('proto')),
"source": eve_event.get('src_ip'),
"destination": eve_event.get('dest_ip'),
"port": eve_event.get('dest_port'),
"packets": flow_data.get('pkts_toserver'),
"bytes": flow_data.get('bytes_toserver'),
"status": flow_data.get('state')
})
elif event_type == 'alert':
alert_data = eve_event.get('alert', {})
await conn_manager.broadcast({
"type": "threat",
"timestamp": timestamp,
"signature": alert_data.get('signature'),
"severity": alert_data.get('severity'),
"category": alert_data.get('category'),
"source": eve_event.get('src_ip'),
"destination": eve_event.get('dest_ip'),
"port": eve_event.get('dest_port')
})
except json.JSONDecodeError:
print(f"Error: Could not decode Suricata JSON: {file.new_info}")
except Exception as e:
print(f"An error occurred processing eve.json: {e}")
await conn_manager.broadcast({
"type": "ip_data",
"ip_addresses": online_ips_cache
})
return file