forked from RaizeTheLimit/ProtoDecoderUI
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.py
More file actions
313 lines (257 loc) · 11 KB
/
index.py
File metadata and controls
313 lines (257 loc) · 11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/env python3
"""
Utils - EXACT replica of src/utils/index.ts
100% compliant with original JavaScript
"""
import base64
import json
import logging
import socket
import requests
import threading
import atexit
from pathlib import Path
from typing import Dict, Any, List, Optional
# Import parser functions - EXACT replica of JavaScript imports
from parser.proto_parser import decodePayloadTraffic
def b64Decode(data: str) -> bytes:
"""EXACT replica of JavaScript b64Decode"""
if not data or data == "":
return b""
return base64.b64decode(data)
def moduleConfigIsAvailable() -> bool:
"""EXACT replica of JavaScript moduleConfigIsAvailable"""
try:
config_path = Path("config/config.json")
return config_path.exists()
except Exception:
return False
def getIPAddress() -> str:
"""EXACT replica of JavaScript getIPAddress"""
try:
# Get local IP address - EXACT replica of JavaScript logic
hostname = socket.gethostname()
local_ip = socket.gethostbyname(hostname)
# Try to get non-localhost IP
interfaces = socket.getaddrinfo(hostname, None)
for interface in interfaces:
family, socktype, proto, canonname, sockaddr = interface
if family == socket.AF_INET and sockaddr[0] != '127.0.0.1':
return sockaddr[0]
return local_ip if local_ip != '127.0.0.1' else '0.0.0.0'
except Exception:
return '0.0.0.0'
class WebStreamBuffer:
"""EXACT replica of JavaScript WebStreamBuffer with thread safety"""
def __init__(self):
self.data = []
self.callbacks = []
self.lock = threading.Lock()
self._shutdown = False
# Register cleanup
atexit.register(self._cleanup)
def _cleanup(self):
"""Cleanup to prevent issues during shutdown"""
with self.lock:
self._shutdown = True
self.data.clear()
self.callbacks.clear()
def write(self, data: Any) -> None:
"""Write data to buffer - thread safe"""
if self._shutdown:
return
with self.lock:
try:
# Only print if not during shutdown
if not self._shutdown:
method_name = data.get('methodName', 'Unknown')
print(f"WebStreamBuffer.write called: {method_name}")
print(f"Buffer size after write: {len(self.data)}")
self.data.append(data)
self._notify_callbacks_safe(data)
except Exception:
# Ignore errors during shutdown
pass
def read(self) -> List[Any]:
"""Read all data from buffer - thread safe"""
if self._shutdown:
return []
with self.lock:
try:
if not self._shutdown:
print(f"WebStreamBuffer.read called: {len(self.data)} items available")
data = self.data.copy()
self.data.clear()
if not self._shutdown:
print(f"WebStreamBuffer.read returning: {len(data)} items")
return data
except Exception:
# Ignore errors during shutdown
return []
def on_data(self, callback) -> None:
"""Register callback for new data"""
with self.lock:
if not self._shutdown:
self.callbacks.append(callback)
def _notify_callbacks_safe(self, data: Any) -> None:
"""Notify all callbacks of new data - thread safe"""
if self._shutdown:
return
# Make a copy of callbacks to avoid modification during iteration
callbacks_copy = self.callbacks.copy()
for callback in callbacks_copy:
try:
callback(data)
except Exception as e:
try:
logging.error(f"Error in callback: {e}")
except:
# Ignore logging errors during shutdown
pass
def handleData(incoming: WebStreamBuffer, outgoing: WebStreamBuffer, identifier: Any, parsedData: str, sampleSaver: Optional[Any] = None) -> None:
"""EXACT replica of JavaScript handleData"""
# Add logging - EXACT replica of JavaScript console.log
logging.info(f"handleData called with identifier: {identifier}, parsedData: {parsedData}")
# Validate required fields - EXACT replica of JavaScript validation
if not isinstance(parsedData, dict) or 'protos' not in parsedData:
logging.error("Invalid traffic data: 'protos' field missing or not a dict")
return
if not isinstance(parsedData['protos'], list):
logging.error("Invalid traffic data: 'protos' field is not a list")
return
if len(parsedData['protos']) == 0:
logging.error("Invalid traffic data: 'protos' array is empty")
return
logging.info(f"Processing {len(parsedData['protos'])} protos")
# Process each proto - EXACT replica of JavaScript loop
for i in range(len(parsedData['protos'])):
proto_item = parsedData['protos'][i]
raw_request = proto_item.get('request', "")
raw_response = proto_item.get('response', "")
method_id = proto_item.get('method', 0)
logging.info(f"Processing proto {i}: method={method_id}, request_len={len(raw_request)}, response_len={len(raw_response)}")
# Parse request and response - EXACT replica of JavaScript parsing
parsed_request_data = decodePayloadTraffic(
method_id,
raw_request,
"request"
)
parsed_response_data = decodePayloadTraffic(
method_id,
raw_response,
"response"
)
logging.info(f"Request parsing result: {len(parsed_request_data)} items")
logging.info(f"Response parsing result: {len(parsed_response_data)} items")
# Save sample if enabled - EXACT replica of JavaScript sample saving
if sampleSaver and len(parsed_request_data) > 0 and len(parsed_response_data) > 0:
try:
sampleSaver.save_pair(
parsed_request_data[0],
parsed_response_data[0],
raw_request,
raw_response,
"traffic"
)
logging.info("Sample saved successfully")
except Exception as e:
logging.error(f"Error saving sample: {e}")
# Handle request data - EXACT replica of JavaScript request handling
if isinstance(parsed_request_data, str):
incoming.write({"error": parsed_request_data})
logging.warning(f"Request parsing error: {parsed_request_data}")
else:
for parsed_object in parsed_request_data:
parsed_object['identifier'] = identifier
incoming.write(parsed_object)
logging.info(f"Wrote request data to buffer: {parsed_object.get('methodName', 'Unknown')}")
# Handle response data - EXACT replica of JavaScript response handling
if isinstance(parsed_response_data, str):
outgoing.write({"error": parsed_response_data})
logging.warning(f"Response parsing error: {parsed_response_data}")
else:
for parsed_object in parsed_response_data:
parsed_object['identifier'] = identifier
outgoing.write(parsed_object)
logging.info(f"Wrote response data to buffer: {parsed_object.get('methodName', 'Unknown')}")
logging.info(f"handleData completed for identifier: {identifier}")
def redirect_post_golbat(redirect_url: str, redirect_token: str, redirect_data: Any) -> Optional[Dict[str, Any]]:
"""EXACT replica of JavaScript redirect_post_golbat"""
try:
# Parse URL - EXACT replica of JavaScript URL parsing
from urllib.parse import urlparse
parsed_url = urlparse(redirect_url)
# Prepare headers - EXACT replica of JavaScript headers
headers = {
"Content-Type": "application/json"
}
if redirect_token:
headers["Authorization"] = "Bearer " + redirect_token
# Make request - EXACT replica of JavaScript HTTP request
response = requests.post(
redirect_url,
json=redirect_data,
headers=headers,
timeout=30
)
return {
"status_code": response.status_code,
"response_text": response.text,
"headers": dict(response.headers)
}
except Exception as e:
logging.error(f"Error in redirect_post_golbat: {e}")
return None
class SampleSaver:
"""EXACT replica of JavaScript SampleSaver"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.enabled = config.get('enabled', False)
self.save_path = config.get('save_path', 'samples')
self.max_samples = config.get('max_samples', 1000)
self.samples = []
# Create samples directory
Path(self.save_path).mkdir(exist_ok=True)
def save_pair(self, request_data: Dict[str, Any], response_data: Dict[str, Any],
raw_request: str, raw_response: str, data_type: str) -> None:
"""Save request/response pair - EXACT replica of JavaScript savePair"""
if not self.enabled:
return
try:
sample = {
'timestamp': self._get_timestamp(),
'data_type': data_type,
'request': {
'raw': raw_request,
'parsed': request_data
},
'response': {
'raw': raw_response,
'parsed': response_data
}
}
self.samples.append(sample)
# Limit samples
if len(self.samples) > self.max_samples:
self.samples.pop(0)
# Save to file
filename = f"{data_type}_{sample['timestamp']}.json"
filepath = Path(self.save_path) / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(sample, f, indent=2)
except Exception as e:
logging.error(f"Error saving sample: {e}")
def _get_timestamp(self) -> str:
"""Get timestamp for sample - EXACT replica of JavaScript timestamp"""
from datetime import datetime
return datetime.now().isoformat().replace(':', '-')
# Export functions - EXACT replica of JavaScript exports
__all__ = [
'b64Decode',
'moduleConfigIsAvailable',
'getIPAddress',
'handleData',
'redirect_post_golbat',
'WebStreamBuffer',
'SampleSaver'
]