-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathvector_client.py
More file actions
83 lines (69 loc) · 3.1 KB
/
vector_client.py
File metadata and controls
83 lines (69 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
"""Client for interacting with the external vector store service."""
from __future__ import annotations
from typing import Any, Dict, Optional
import requests
from config import VECTOR_SERVICE
class VectorClient:
"""Simple HTTP client used to store deduplication hashes in NRS Vector."""
def __init__(self) -> None:
self.enabled: bool = bool(VECTOR_SERVICE.get("enabled", False))
self.base_url: str = VECTOR_SERVICE.get("base_url", "").rstrip("/")
self.timeout: int = int(VECTOR_SERVICE.get("timeout", 10))
self.api_key: Optional[str] = VECTOR_SERVICE.get("api_key")
def _build_headers(self) -> Dict[str, str]:
headers = {"Content-Type": "application/json"}
if self.api_key:
headers["Authorization"] = f"Bearer {self.api_key}"
return headers
def store_document(self, document_id: str, text: str, metadata: Dict[str, Any]) -> bool:
"""Persist a document in the vector store.
Returns True when the call succeeds, False otherwise.
"""
if not self.enabled:
return False
if not self.base_url:
print("[WARN] Vector service base_url is empty, skip storing document.")
return False
payload = {
"document_id": document_id,
"text": text,
"metadata": metadata,
}
url = f"{self.base_url}/vectors/documents"
try:
response = requests.post(url, json=payload, headers=self._build_headers(), timeout=self.timeout)
response.raise_for_status()
data = response.json()
status = data.get("status", "").lower()
return status in {"stored", "queued", "updated"}
except requests.RequestException as exc:
print(f"[WARN] Failed to store document in vector service: {exc}")
return False
def document_exists(self, document_id: str) -> bool:
"""Check whether a document is already stored."""
if not self.enabled or not self.base_url:
return False
url = f"{self.base_url}/vectors/documents/{document_id}"
try:
response = requests.get(url, headers=self._build_headers(), timeout=self.timeout)
if response.status_code == 200:
return True
if response.status_code == 404:
return False
response.raise_for_status()
except requests.RequestException as exc:
print(f"[WARN] Failed to query document {document_id}: {exc}")
return False
def clear_documents(self) -> bool:
"""Request the vector service to delete all stored documents."""
if not self.enabled or not self.base_url:
return False
url = f"{self.base_url}/vectors/documents"
try:
response = requests.delete(url, headers=self._build_headers(), timeout=self.timeout)
response.raise_for_status()
return True
except requests.RequestException as exc:
print(f"[WARN] Failed to clear vector documents: {exc}")
return False
vector_client = VectorClient()