-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdatabase.py
More file actions
276 lines (245 loc) · 10.5 KB
/
database.py
File metadata and controls
276 lines (245 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
import sqlite3
from datetime import datetime
from typing import List
class SocialDatabase:
def __init__(self, db_path: str):
print(f"[DEBUG] Initializing SocialDatabase at {db_path}")
self.db_path = db_path
self.conn = sqlite3.connect(db_path, check_same_thread=False)
self.conn.row_factory = sqlite3.Row
self.create_tables()
def create_tables(self):
print("[DEBUG] Creating tables in SocialDatabase if not exist")
cursor = self.conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
server_user_id INTEGER NOT NULL,
username TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
token TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create analysis_keys table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analysis_keys (
id INTEGER PRIMARY KEY AUTOINCREMENT,
key_id INTEGER NOT NULL,
key TEXT UNIQUE NOT NULL,
session_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
status TEXT DEFAULT 'active',
metadata TEXT,
FOREIGN KEY (user_id) REFERENCES users(id)
)
''')
# Create servers table
cursor.execute('''
CREATE TABLE IF NOT EXISTS servers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT UNIQUE NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
selected INTEGER DEFAULT 0
)
''')
url_to_insert = "https://aetheronepysocial.emolio.nl"
description = "AetherOnePy Social Server"
cursor.execute('''
INSERT OR IGNORE INTO servers (url, description, selected)
VALUES (?, ?, ?)
''', (url_to_insert, description, 1))
self.conn.commit()
def save_user(self, username: str, email: str, token: str, server_user_id: int) -> int:
cursor = self.conn.cursor()
print(f"[DEBUG] Saving user: {username}, {email}, {token}, {server_user_id}")
cursor.execute('''
INSERT OR REPLACE INTO users (username, email, token, server_user_id)
VALUES (?, ?, ?, ?)
''', (username, email, token, server_user_id))
self.conn.commit()
print(f"[DEBUG] User saved: {cursor.lastrowid}")
return cursor.lastrowid
def update_user_token(self, email: str, token: str) -> bool:
cursor = self.conn.cursor()
print(f"[DEBUG] Updating user token: {email}, {token}")
cursor.execute('''
UPDATE users SET token = ? WHERE email = ?
''', (token, email))
self.conn.commit()
return cursor.rowcount > 0
def get_user_by_email(self, email: str) -> dict:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM users WHERE email = ?', (email,))
row = cursor.fetchone()
return dict(row) if row else None
def get_only_user(self) -> dict: # only one user allowed in the users table, because there is not need for multiple users
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM users ORDER BY id DESC LIMIT 1')
row = cursor.fetchone()
return dict(row) if row else None
def get_user_token(self, email: str) -> str:
cursor = self.conn.cursor()
cursor.execute('SELECT token FROM users WHERE email = ?', (email,))
row = cursor.fetchone()
return row['token'] if row else None
# Analysis Keys CRUD operations
def create_analysis_key(self, key_id: int, key: str, session_id: int, user_id: int,
expires_at: datetime = None, metadata: str = None) -> int:
"""Create a new analysis key"""
cursor = self.conn.cursor()
print(f"key_id: {key_id}, key: {key}, session_id: {session_id}, user_id: {user_id}, expires_at: {expires_at}, metadata: {metadata}")
cursor.execute('''
INSERT INTO analysis_keys (key_id, key, session_id, user_id, expires_at, metadata)
VALUES (?, ?, ?, ?, ?, ?)
''', (key_id, key, session_id, user_id, expires_at, metadata))
self.conn.commit()
return cursor.lastrowid
def get_analysis_key(self, key: str) -> dict:
"""Get analysis key by key string"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM analysis_keys WHERE key = ?
''', (key,))
row = cursor.fetchone()
return dict(row) if row else None
def get_analysis_key_id(self, key_id: int) -> dict:
"""Get analysis key by key string"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM analysis_keys WHERE key_id = ?
''', (key_id,))
row = cursor.fetchone()
return dict(row) if row else None
def get_analysis_keys_by_user(self, user_id: int) -> List[dict]:
"""Get all analysis keys for a user"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM analysis_keys
WHERE user_id = ?
ORDER BY created_at DESC
''', (user_id,))
return [dict(row) for row in cursor.fetchall()]
def get_analysis_keys_by_analysis(self, analysis_id: int) -> List[dict]:
"""Get all keys for a specific analysis"""
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM analysis_keys
WHERE analysis_id = ?
ORDER BY created_at DESC
''', (analysis_id,))
return [dict(row) for row in cursor.fetchall()]
def update_analysis_key_status(self, key: str, status: str) -> bool:
"""Update analysis key status"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE analysis_keys
SET status = ?
WHERE key = ?
''', (status, key))
self.conn.commit()
return cursor.rowcount > 0
def update_analysis_key_metadata(self, key: str, metadata: str) -> bool:
"""Update analysis key metadata"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE analysis_keys
SET metadata = ?
WHERE key = ?
''', (metadata, key))
self.conn.commit()
return cursor.rowcount > 0
def delete_analysis_key(self, key: str) -> bool:
"""Delete an analysis key"""
cursor = self.conn.cursor()
cursor.execute('''
DELETE FROM analysis_keys
WHERE key = ?
''', (key,))
self.conn.commit()
return cursor.rowcount > 0
def cleanup_expired_keys(self) -> int:
"""Remove expired analysis keys"""
cursor = self.conn.cursor()
cursor.execute('''
DELETE FROM analysis_keys
WHERE expires_at IS NOT NULL
AND expires_at < CURRENT_TIMESTAMP
''')
self.conn.commit()
return cursor.rowcount
def deactivate_analysis_keys(self, analysis_id: int) -> int:
"""Deactivate all keys for an analysis"""
cursor = self.conn.cursor()
cursor.execute('''
UPDATE analysis_keys
SET status = 'inactive'
WHERE analysis_id = ?
''', (analysis_id,))
self.conn.commit()
return cursor.rowcount
def close(self):
self.conn.close()
def upsert_user_token(self, username: str, email: str, token: str, server_user_id: int = None) -> bool:
cursor = self.conn.cursor()
# Always keep only one user: delete all others
cursor.execute('DELETE FROM users WHERE email != ?', (email,))
cursor.execute('SELECT id FROM users WHERE email = ?', (email,))
row = cursor.fetchone()
if row:
# User exists, update all fields
update_fields = 'username = ?, email = ?, token = ?'
params = [username, email, token]
if server_user_id is not None:
update_fields += ', server_user_id = ?'
params.append(server_user_id)
cursor.execute(f'UPDATE users SET {update_fields} WHERE id = ?', params + [row['id']])
else:
# No user exists, insert new
cursor.execute('''
INSERT INTO users (username, email, token{server_id}) VALUES (?, ?, ?{server_id_val})
'''.format(
server_id=', server_user_id' if server_user_id is not None else '',
server_id_val=', ?' if server_user_id is not None else ''
), ([username, email, token] + ([server_user_id] if server_user_id is not None else [])))
self.conn.commit()
return True
def add_server(self, url: str, description: str = None, selected: bool = False) -> int:
cursor = self.conn.cursor()
if selected:
# Unselect all other servers
cursor.execute('UPDATE servers SET selected = 0')
cursor.execute('''
INSERT INTO servers (url, description, selected) VALUES (?, ?, ?)
''', (url, description, int(selected)))
self.conn.commit()
return cursor.lastrowid
def set_selected_server(self, server_id: int):
cursor = self.conn.cursor()
cursor.execute('UPDATE servers SET selected = 0')
cursor.execute('UPDATE servers SET selected = 1 WHERE id = ?', (server_id,))
self.conn.commit()
def get_servers(self) -> list:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM servers ORDER BY created_at DESC')
return [dict(row) for row in cursor.fetchall()]
def get_server_by_id(self, server_id: int) -> dict:
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM servers WHERE id = ?', (server_id,))
row = cursor.fetchone()
return dict(row) if row else None
def delete_server(self, server_id: int) -> bool:
cursor = self.conn.cursor()
cursor.execute('DELETE FROM servers WHERE id = ?', (server_id,))
self.conn.commit()
return cursor.rowcount > 0
def list_all_sessions(self):
"""Return all sessions across all cases."""
cursor = self.conn.cursor()
cursor.execute('SELECT * FROM sessions ORDER BY created DESC')
rows = cursor.fetchall()
return [self._row_to_session(row) for row in rows]