Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a645692
adapt remote client
lautarolecumberry Dec 1, 2025
192aef5
make number.get work
lautarolecumberry Dec 1, 2025
9cbf1c5
adapt async_get, get_dataframe
lautarolecumberry Dec 1, 2025
8bb3839
adapt latest
lautarolecumberry Dec 1, 2025
0950777
adapt save, async save, save dataframe
lautarolecumberry Dec 1, 2025
3a7a49d
change collection name to scehma name, rename to functions
lautarolecumberry Dec 1, 2025
8a34dd3
clean models datalake
lautarolecumberry Dec 2, 2025
7686143
order datalake v4 generic, models
lautarolecumberry Dec 2, 2025
85f70a3
bump version
lautarolecumberry Dec 2, 2025
6d51a7f
fix dev version
lautarolecumberry Dec 2, 2025
9d0f4b6
remove routine evaluation v4
lautarolecumberry Dec 3, 2025
3b77eb1
add latest default
lautarolecumberry Dec 3, 2025
5c9abe8
remove old models
gbaudino-splight Dec 4, 2025
e90f093
move models into the splight_lib_models and export that models
gbaudino-splight Dec 4, 2025
1733141
use TransitionSchemaName instead of str
gbaudino-splight Dec 4, 2025
95a7eca
create apply and async apply in the requests
gbaudino-splight Dec 4, 2025
431dbb4
export datalake models and delete internal models from exports
gbaudino-splight Dec 4, 2025
f921488
bump dev version and lock
gbaudino-splight Dec 4, 2025
7ce2f40
remove unused import and use dict
gbaudino-splight Dec 4, 2025
264fd5f
remove TransitionSchemaName and use Literals
gbaudino-splight Dec 4, 2025
bd063ba
add new datav4 write models
gbaudino-splight Dec 5, 2025
5879144
adapt __to_write_request to the new write models?
gbaudino-splight Dec 5, 2025
d8f9ff8
fix Value TypeAlias
gbaudino-splight Dec 5, 2025
28854c6
some minimal fixes
gbaudino-splight Dec 9, 2025
b8e8719
dump to json mode to make it serializable
gbaudino-splight Dec 9, 2025
de35e59
remove Records common class, fix save and send_documents methods to u…
gbaudino-splight Dec 9, 2025
e0a7d7c
rename docs to data_points
gbaudino-splight Dec 9, 2025
46eb9cc
Update splight_lib/client/datalake/v4/generic.py
gbaudino-splight Dec 10, 2025
28c096b
final version
gbaudino-splight Dec 10, 2025
a17ab1c
Update filters to use solution_keys in SolutionOutputDocument
gbaudino-splight Dec 10, 2025
cb33b38
lock, remove duplicated export, fix to use solution_keys instead of f…
gbaudino-splight Dec 10, 2025
32d468a
new dev version
gbaudino-splight Dec 10, 2025
48bafaf
remove forced sync transport for async client
gbaudino-splight Dec 11, 2025
33d4375
final version
gbaudino-splight Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "splight-lib"
version = "5.23.3"
version = "5.24.0"
description = "Splight Python Library"
authors = [
{name = "Splight Dev",email = "[email protected]"}
Expand Down
10 changes: 2 additions & 8 deletions splight_lib/client/datalake/common/abstract.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
from abc import abstractmethod
from typing import Any, TypedDict

from splight_lib.abstract.client import AbstractRemoteClient, QuerySet


class Records(TypedDict):
collection: str
records: list[dict[str, Any]]


# TODO: Fix this class after delete QuerySet
class AbstractDatalakeClient(AbstractRemoteClient):
def get(self, *args, **kwargs) -> QuerySet:
Expand All @@ -22,11 +16,11 @@ async def async_get(self, *args, **kwargs):
return await self._async_get(*args, **kwargs)

@abstractmethod
def save(self, records: Records) -> list[dict]:
def save(self, records: dict) -> list[dict]:
pass

@abstractmethod
async def async_save(self, records: Records) -> list[dict]:
async def async_save(self, records: dict) -> list[dict]:
pass

@abstractmethod
Expand Down
18 changes: 6 additions & 12 deletions splight_lib/client/datalake/v3/remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
from time import sleep

from furl import furl
from httpx import HTTPTransport
from retry import retry

from splight_lib.auth import SplightAuthToken
from splight_lib.client.datalake.common.abstract import (
AbstractDatalakeClient,
Records,
)
from splight_lib.client.datalake.common.abstract import AbstractDatalakeClient
from splight_lib.client.datalake.common.buffer import DatalakeDocumentBuffer
from splight_lib.client.datalake.v3.classmap import COLLECTION_PREFIXS_MAP
from splight_lib.client.datalake.v3.exceptions import DatalakeRequestError
Expand Down Expand Up @@ -42,16 +38,14 @@ def __init__(
self._api_version = "v3"
self._default_path = "data"

self._restclient = SplightRestClient(
transport=HTTPTransport(retries=3)
)
self._restclient = SplightRestClient()
self._restclient.update_headers(token.header)
logger.debug(
"Remote datalake client initialized.", tags=LogTags.DATALAKE
)

@retry(EXCEPTIONS, tries=3, delay=2, jitter=1)
def save(self, records: Records) -> list[dict]:
def save(self, records: dict) -> list[dict]:
prefix = self._get_prefix(records["collection"])
url = self._base_url / f"{prefix}/write/"
response = self._restclient.post(url, json=records)
Expand All @@ -62,7 +56,7 @@ def save(self, records: Records) -> list[dict]:
@retry(EXCEPTIONS, tries=3, delay=2, jitter=1)
async def async_save(
self,
records: Records,
records: dict,
) -> list[dict]:
# POST /data/write
prefix = self._get_prefix(records["collection"])
Expand Down Expand Up @@ -144,7 +138,7 @@ def __init__(
tags=LogTags.DATALAKE,
)

def save(self, records: Records) -> list[dict]:
def save(self, records: dict) -> list[dict]:
logger.debug("Saving documents in datalake", tags=LogTags.DATALAKE)
collection = records["collection"]
instances = records["records"]
Expand Down Expand Up @@ -243,7 +237,7 @@ def __init__(
tags=LogTags.DATALAKE,
)

def save(self, records: Records) -> list[dict]:
def save(self, records: dict) -> list[dict]:
logger.debug("Saving documents in datalake", tags=LogTags.DATALAKE)
collection = records["collection"]
buffer = self._data_buffers[collection]
Expand Down
21 changes: 20 additions & 1 deletion splight_lib/client/datalake/v4/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
BufferedSyncRemoteDataClient,
SyncRemoteDatalakeClient,
)
from splight_lib.settings import DatalakeClientType
from splight_lib.settings import (
DatalakeClientType,
SplightAPIVersion,
datalake_settings,
workspace_settings,
)

DL_CLIENT_TYPE_MAP = {
DatalakeClientType.BUFFERED_ASYNC: BufferedAsyncRemoteDatalakeClient,
Expand All @@ -22,3 +27,17 @@ def build(
parameters: dict[str, Any] = {},
) -> AbstractDatalakeClient:
return DL_CLIENT_TYPE_MAP[dl_client_type](**parameters)


def get_datalake_client() -> AbstractDatalakeClient:
return DatalakeClientBuilder.build(
dl_client_type=datalake_settings.DL_CLIENT_TYPE,
parameters={
"base_url": workspace_settings.SPLIGHT_PLATFORM_API_HOST,
"access_id": workspace_settings.SPLIGHT_ACCESS_ID,
"secret_key": workspace_settings.SPLIGHT_SECRET_KEY,
"api_version": SplightAPIVersion.V4,
"buffer_size": datalake_settings.DL_BUFFER_SIZE,
"buffer_timeout": datalake_settings.DL_BUFFER_TIMEOUT,
},
)
8 changes: 0 additions & 8 deletions splight_lib/client/datalake/v4/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,3 @@ class DatalakeRequestError(Exception):
def __init__(self, status_code: int, message: str):
self._msg = f"Request failed with status code {status_code}: {message}"
super().__init__(self._msg)


class InvalidCollectionName(Exception):
def __init__(self, collection: str):
self._msg = f"Collection {collection} is not a valid collection"

def __str__(self) -> str:
return self._msg
34 changes: 34 additions & 0 deletions splight_lib/client/datalake/v4/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from datetime import datetime
from enum import Enum
from typing import TypeAlias

Timestamp: TypeAlias = datetime
# Order in types is important. When parsing if float is first,
# bools will be interpreted as floats (1.0, 0.0) instead of bools.
Value: TypeAlias = bool | str | float


class AggregationFunction(str, Enum):
Comment thread
gbaudino-splight marked this conversation as resolved.
SUM = "sum"
AVG = "avg"
MIN = "min"
MAX = "max"
COUNT = "count"
Comment thread
gbaudino-splight marked this conversation as resolved.
LAST = "last"


class TimeUnit(str, Enum):
SECOND = "second"
MINUTE = "minute"
HOUR = "hour"
DAY = "day"


class TransitionSchemaName(str, Enum):
DEFAULT = "default"
SOLUTIONS = "solutions"


class TransitionSort(int, Enum):
ASC = 1
DESC = -1
73 changes: 38 additions & 35 deletions splight_lib/client/datalake/v4/remote_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@
from time import sleep

from furl import furl
from httpx import HTTPTransport
from retry import retry

from splight_lib.auth import SplightAuthToken
from splight_lib.client.datalake.common.abstract import (
AbstractDatalakeClient,
Records,
)
from splight_lib.client.datalake.common.abstract import AbstractDatalakeClient
from splight_lib.client.datalake.common.buffer import DatalakeDocumentBuffer
from splight_lib.client.datalake.v4.exceptions import DatalakeRequestError
from splight_lib.client.exceptions import SPLIGHT_REQUEST_EXCEPTIONS
Expand Down Expand Up @@ -39,9 +35,7 @@ def __init__(
secret_key=secret_key,
)

self._restclient = SplightRestClient(
transport=HTTPTransport(retries=3)
)
self._restclient = SplightRestClient()
self._restclient.update_headers(token.header)
logger.debug(
"Remote datalake client initialized.", tags=LogTags.DATALAKE
Expand Down Expand Up @@ -84,8 +78,7 @@ async def _async_get(self, request: dict) -> list[dict]:

@property
def prefix(self) -> str:
return "v4/data"
# return f"v4/data/{self.resource}"
return "v4/data/transition"


class BufferedAsyncRemoteDatalakeClient(SyncRemoteDatalakeClient):
Expand Down Expand Up @@ -140,28 +133,29 @@ def __init__(

def save(self, records: dict) -> list[dict]:
logger.debug("Saving documents in datalake", tags=LogTags.DATALAKE)
instances = records["records"]
collection = records["collection"]
buffer = self._data_buffers[collection]
instance = records["records"]
data_points = instance["data_points"]
schema_name = instance["schema_name"]
buffer = self._data_buffers[schema_name]
with self._lock:
if buffer.should_flush():
logger.debug(
"Flushing datalake buffer with %s elements",
len(buffer.data),
)
self._send_documents(collection, buffer.data)
self._send_documents(schema_name, buffer.data)
buffer.reset()
buffer.add_documents(instances)
return instances
buffer.add_documents(data_points)
return data_points

def _flusher(self):
while True:
for collection, buffer in self._data_buffers.items():
self._flush_buffer(collection, buffer)
for schema_name, buffer in self._data_buffers.items():
self._flush_buffer(schema_name, buffer)
sleep(0.5)

def _flush_buffer(
self, collection: str, buffer: DatalakeDocumentBuffer
self, schema_name: str, buffer: DatalakeDocumentBuffer
) -> None:
with self._lock:
if buffer.should_flush():
Expand All @@ -170,22 +164,26 @@ def _flush_buffer(
"Flushing datalake buffer with %s elements",
len(buffer.data),
)
self._send_documents(collection, buffer.data)
self._send_documents(schema_name, buffer.data)
buffer.reset()
except Exception:
logger.error("Unable to save documents", exc_info=True)

@retry(EXCEPTIONS, tries=3, delay=2, jitter=1)
def _send_documents(self, collection: str, docs: list[dict]) -> list[dict]:
def _send_documents(
self, schema_name: str, data_points: list[dict]
) -> list[dict]:
url = self._base_url / f"{self.prefix}/write/"
data = {
"collection": collection,
"records": docs,
"records": {
"schema_name": schema_name,
"data_points": data_points,
},
}
response = self._restclient.post(url, json=data)
if response.is_error:
raise DatalakeRequestError(response.status_code, response.text)
return docs
return data_points


class BufferedSyncRemoteDataClient(SyncRemoteDatalakeClient):
Expand Down Expand Up @@ -236,31 +234,36 @@ def __init__(
tags=LogTags.DATALAKE,
)

def save(self, records: Records) -> list[dict]:
def save(self, records: dict) -> list[dict]:
logger.debug("Saving documents in datalake", tags=LogTags.DATALAKE)
collection = records["collection"]
records = records["records"]
buffer = self._data_buffers[collection]
instance = records["records"]
schema_name = instance["schema_name"]
data_points = instance["data_points"]
buffer = self._data_buffers[schema_name]
with self._lock:
buffer.add_documents(records)
buffer.add_documents(data_points)
if buffer.should_flush():
logger.debug(
"Flushing datalake buffer with %s elements",
len(buffer.data),
tags=LogTags.DATALAKE,
)
self._send_documents(collection, buffer.data)
self._send_documents(schema_name, buffer.data)
buffer.reset()
return records
return data_points

@retry(EXCEPTIONS, tries=3, delay=2, jitter=1)
def _send_documents(self, collection: str, docs: list[dict]) -> list[dict]:
def _send_documents(
self, schema_name: str, data_points: list[dict]
) -> list[dict]:
url = self._base_url / f"{self.prefix}/write/"
data = {
"collection": collection,
"records": docs,
"records": {
"schema_name": schema_name,
"data_points": data_points,
}
}
response = self._restclient.post(url, json=data)
if response.is_error:
raise DatalakeRequestError(response.status_code, response.text)
return docs
return data_points
Loading
Loading