-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprotocol.py
More file actions
68 lines (48 loc) · 1.97 KB
/
protocol.py
File metadata and controls
68 lines (48 loc) · 1.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
import os
import base64
import json
def derive_session_key(shared_secret: bytes) -> bytes:
# Deriva una clave de sesión a partir de un secreto compartido.
return HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b'session_key'
).derive(shared_secret)
def encrypt_message(message: str, session_key: bytes) -> tuple:
# Cifra un mensaje usando AES-GCM con la clave de sesión proporcionada.
aesgcm = AESGCM(session_key)
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, message.encode(), None)
return (nonce, ciphertext)
def decrypt_message(nonce: bytes, ciphertext: bytes, session_key: bytes) -> str:
# Descifra un mensaje cifrado usando AES-GCM.
aesgcm = AESGCM(session_key)
return aesgcm.decrypt(nonce, ciphertext, None).decode()
def b64encode_bytes(data: bytes) -> str:
# Codifica datos en bytes a una cadena en base64.
return base64.b64encode(data).decode()
def b64decode_bytes(data: str) -> bytes:
# Decodifica una cadena en base64 a bytes.
return base64.b64decode(data)
def json_send(sock, obj: dict):
# Envía un objeto JSON a través de un socket.
data = json.dumps(obj).encode()
sock.sendall(len(data).to_bytes(4, "big") + data)
def json_recv(sock) -> dict:
# Recibe un objeto JSON a través de un socket.
length_bytes = sock.recv(4)
if len(length_bytes) < 4:
raise ValueError("No se recibió longitud completa del mensaje")
length = int.from_bytes(length_bytes, "big")
data = b''
while len(data) < length:
packet = sock.recv(length - len(data))
if not packet:
raise ValueError("Conexión cerrada inesperadamente")
data += packet
return json.loads(data.decode())