Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docarray/array/document.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def __new__(cls, *args, storage: str = 'memory', **kwargs) -> 'DocumentArrayLike
from .sqlite import DocumentArraySqlite

instance = super().__new__(DocumentArraySqlite)
elif storage == 'pqlite':
from .pqlite import DocumentArrayPqlite

instance = super().__new__(DocumentArrayPqlite)
elif storage == 'weaviate':
from .weaviate import DocumentArrayWeaviate

Expand Down
7 changes: 7 additions & 0 deletions docarray/array/pqlite.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .document import DocumentArray
from .storage.pqlite import StorageMixins


class DocumentArrayPqlite(StorageMixins, DocumentArray):
def __new__(cls, *args, **kwargs):
return super().__new__(cls)
10 changes: 10 additions & 0 deletions docarray/array/storage/pqlite/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .backend import BackendMixin
from .getsetdel import GetSetDelMixin
from .seqlike import SequenceLikeMixin
from abc import ABC

__all__ = ['StorageMixins']


class StorageMixins(BackendMixin, GetSetDelMixin, SequenceLikeMixin, ABC):
...
63 changes: 63 additions & 0 deletions docarray/array/storage/pqlite/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from dataclasses import dataclass, asdict
from typing import (
Union,
Dict,
Optional,
TYPE_CHECKING,
)

from ..base.backend import BaseBackendMixin
from ....helper import dataclass_from_dict

if TYPE_CHECKING:
from ....types import (
DocumentArraySourceType,
)


@dataclass
class PqliteConfig:
n_dim: int = 1
metric: str = 'cosine'
serialize_protocol: str = 'pickle'
data_path: Optional[str] = None


class BackendMixin(BaseBackendMixin):
"""Provide necessary functions to enable this storage backend. """

def _init_storage(
self,
docs: Optional['DocumentArraySourceType'] = None,
config: Optional[Union[PqliteConfig, Dict]] = None,
):
if not config:
config = PqliteConfig()
if isinstance(config, dict):
config = dataclass_from_dict(PqliteConfig, config)

self._persist = bool(config.data_path)

if not self._persist:
from tempfile import TemporaryDirectory

config.data_path = TemporaryDirectory().name

self._config = config

from pqlite import PQLite
from .helper import OffsetMapping

config = asdict(config)
n_dim = config.pop('n_dim')

self._pqlite = PQLite(n_dim, **config)
self._offset2ids = OffsetMapping(
name='docarray',
data_path=config['data_path'],
in_memory=False,
)

if docs is not None:
self.clear()
self.extend(docs)
88 changes: 88 additions & 0 deletions docarray/array/storage/pqlite/getsetdel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import (
Sequence,
Iterable,
)
import numpy as np
from ...memory import DocumentArrayInMemory
from ..base.getsetdel import BaseGetSetDelMixin
from .... import Document


class GetSetDelMixin(BaseGetSetDelMixin):
"""Implement required and derived functions that power `getitem`, `setitem`, `delitem`"""

# essential methods start

def _get_doc_by_offset(self, offset: int) -> 'Document':
offset = len(self) + offset if offset < 0 else offset
doc_id = self._offset2ids.get_id_by_offset(offset)
doc = self._pqlite.get_doc_by_id(doc_id) if doc_id else None
if doc is None:
raise IndexError('index out of range')
return doc

def _get_doc_by_id(self, _id: str) -> 'Document':
doc = self._pqlite.get_doc_by_id(_id)
if doc is None:
raise KeyError(f'Can not find Document with id=`{_id}`')
return doc

def _get_docs_by_offsets(self, offsets: Sequence[int]) -> Iterable['Document']:
ids = self._offset2ids.get_ids_by_offsets(offsets)
return self._get_docs_by_ids(ids)

def _get_docs_by_ids(self, ids: str) -> Iterable['Document']:
for _id in ids:
yield self._get_doc_by_id(_id)

def _get_docs_by_slice(self, _slice: slice) -> Iterable['Document']:
return self._get_docs_by_offsets(range(len(self))[_slice])

def _get_docs_by_mask(self, mask: Sequence[bool]):
offsets = [i for i, m in enumerate(mask) if m is True]
return self._get_docs_by_offsets(offsets)

def _set_doc_by_offset(self, offset: int, value: 'Document'):
offset = len(self) + offset if offset < 0 else offset
self._offset2ids.set_at_offset(offset, value.id)
docs = DocumentArrayInMemory([value])
if docs.embeddings is None:
docs.embeddings = np.zeros((1, self._pqlite.dim))
self._pqlite.update(docs)

def _set_doc_by_id(self, _id: str, value: 'Document'):
offset = self._offset2ids.get_offset_by_id(_id)
self._set_doc_by_offset(offset, value)

def _set_doc_value_pairs(
self, docs: Iterable['Document'], values: Iterable['Document']
):
for _d, _v in zip(docs, values):
self._set_doc_by_id(_d.id, _v)

def _del_doc_by_id(self, _id: str):
offset = self._offset2ids.get_offset_by_id(_id)
self._offset2ids.del_at_offset(offset, commit=True)
self._pqlite.delete([_id])

def _del_doc_by_offset(self, offset: int):
offset = len(self) + offset if offset < 0 else offset
_id = self._offset2ids.get_id_by_offset(offset)
self._offset2ids.del_at_offset(offset)
self._pqlite.delete([_id])

def _del_doc_by_offsets(self, offsets: Sequence[int]):
ids = []
for offset in offsets:
ids.append(self._offset2ids.get_id_by_offset(offset))

self._offset2ids.del_at_offsets(offsets)
self._pqlite.delete(ids)

def _del_docs_by_slice(self, _slice: slice):
offsets = range(len(self))[_slice]
self._del_doc_by_offsets(offsets)

def _del_docs_by_mask(self, mask: Sequence[bool]):
offsets = [i for i, m in enumerate(mask) if m is True]
self._del_doc_by_offsets(offsets)
115 changes: 115 additions & 0 deletions docarray/array/storage/pqlite/helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Optional, List, Tuple

from pqlite.storage.table import Table


class OffsetMapping(Table):
def __init__(
self,
name: str = 'offset2ids',
data_path: Optional[str] = None,
in_memory: bool = True,
):
super().__init__(name, data_path, in_memory)
self.create_table()
self._size = None

def create_table(self):
sql = f'''CREATE TABLE IF NOT EXISTS {self.name}
(offset INTEGER NOT NULL PRIMARY KEY,
doc_id INTEGER TEXT NOT NULL)'''

self.execute(sql, commit=True)

def clear(self):
super().clear()
self._size = None

def __len__(self):
return self.size

@property
def size(self):
if self._size is None:
sql = f'SELECT MAX(offset) from {self.name} LIMIT 1;'
result = self._conn.execute(sql).fetchone()
self._size = result[0] + 1 if result[0] else 0

return self._size

def extend_doc_ids(self, doc_ids: List[str], commit: bool = True):
offsets = [self.size + i for i in range(len(doc_ids))]
offset_ids = list(zip(offsets, doc_ids))
self._insert(offset_ids, commit=commit)

def _insert(self, offset_ids: List[Tuple[int, str]], commit: bool = True):
sql = f'INSERT INTO {self.name}(offset, doc_id) VALUES (?, ?);'
self.execute_many(sql, offset_ids, commit=commit)
self._size = self.size + len(offset_ids)

def get_id_by_offset(self, offset: int):
offset = len(self) + offset if offset < 0 else offset
sql = f'SELECT doc_id FROM {self.name} WHERE offset = ? LIMIT 1;'
result = self._conn.execute(sql, (offset,)).fetchone()
return str(result[0]) if result is not None else None

def get_ids_by_offsets(self, offsets: List[int]) -> List[str]:
return [self.get_id_by_offset(offset) for offset in offsets]

def get_offsets_by_ids(self, ids: List[str]) -> List[int]:
return [self.get_offset_by_id(k) for k in ids]

def get_offset_by_id(self, doc_id: str):
sql = f'SELECT offset FROM {self.name} WHERE doc_id=? LIMIT 1;'
result = self._conn.execute(sql, (doc_id,)).fetchone()
return result[0] if result else None

def del_at_offset(self, offset: int, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
sql = f'DELETE FROM {self.name} WHERE offset=?'
self._conn.execute(sql, (offset,))
self.shift_offset(offset, shift_step=1, direction='left', commit=commit)

self._size -= 1

def del_at_offsets(self, offsets: List[int], commit: bool = True):
for offset in sorted(offsets, reverse=True):
self.del_at_offset(offset, commit=False)
if commit:
self.commit()

def insert_at_offset(self, offset: int, doc_id: str, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
self.shift_offset(offset - 1, shift_step=1, direction='right', commit=False)
self._insert([(offset, doc_id)], commit=commit)

def set_at_offset(self, offset: int, doc_id: str, commit: bool = True):
offset = len(self) + offset if offset < 0 else offset
sql = f'UPDATE {self.name} SET doc_id=? WHERE offset = ?'
self._conn.execute(
sql,
(
doc_id,
offset,
),
)
if commit:
self.commit()

def shift_offset(
self,
shift_from: int,
shift_step: int = 1,
direction: str = 'left',
commit: bool = True,
):
if direction == 'left':
sql = f'UPDATE {self.name} SET offset=offset-{shift_step} WHERE offset > ?'
elif direction == 'right':
sql = f'UPDATE {self.name} SET offset=offset+{shift_step} WHERE offset > ?'
else:
raise ValueError(f'The shit_offset directory {direction} is not supported!')

self._conn.execute(sql, (shift_from,))
if commit:
self._conn.commit()
80 changes: 80 additions & 0 deletions docarray/array/storage/pqlite/seqlike.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
from typing import Iterator, Union, Iterable, Sequence, MutableSequence
import numpy as np
from .... import Document
from ...memory import DocumentArrayInMemory


class SequenceLikeMixin(MutableSequence[Document]):
"""Implement sequence-like methods"""

def insert(self, index: int, value: 'Document'):
"""Insert `doc` at `index`.

:param index: Position of the insertion.
:param value: The doc needs to be inserted.
"""
if value.embedding is None:
value.embedding = np.zeros(self._pqlite.dim, dtype=np.float32)

self._pqlite.index(DocumentArrayInMemory([value]))
self._offset2ids.insert_at_offset(index, value.id)

def append(self, value: 'Document') -> None:
self._pqlite.index(DocumentArrayInMemory([value]))
self._offset2ids.extend_doc_ids([value.id])

def extend(self, values: Iterable['Document']) -> None:
docs = DocumentArrayInMemory(values)
for doc in docs:
if doc.embedding is None:
doc.embedding = np.zeros(self._pqlite.dim, dtype=np.float32)
self._pqlite.index(docs)
self._offset2ids.extend_doc_ids([doc.id for doc in docs])

def clear(self):
"""Clear the data of :class:`DocumentArray`"""
self._offset2ids.clear()
self._pqlite.clear()

def __del__(self) -> None:
if not self._persist:
self._offset2ids.clear()
self._pqlite.clear()

def __eq__(self, other):
"""In pqlite backend, data are considered as identical if configs point to the same database source"""
return (
type(self) is type(other)
and type(self._config) is type(other._config)
and self._config == other._config
)

def __len__(self):
return self._offset2ids.size

def __iter__(self) -> Iterator['Document']:
for i in range(len(self)):
yield self[i]

def __contains__(self, x: Union[str, 'Document']):
if isinstance(x, str):
return self._offset2ids.get_offset_by_id(x) is not None
elif isinstance(x, Document):
return self._offset2ids.get_offset_by_id(x.id) is not None
else:
return False

def __bool__(self):
"""To simulate ```l = []; if l: ...```

:return: returns true if the length of the array is larger than 0
"""
return len(self) > 0

def __repr__(self):
return f'<DocumentArray[PQLite] (length={len(self)}) at {id(self)}>'

def __add__(self, other: Union['Document', Sequence['Document']]):
v = type(self)(self)
v.extend(other)
return v
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
'fastapi',
'uvicorn',
'weaviate-client~=3.3.0',
'pqlite>=0.2.1',
],
'test': [
'pytest',
Expand Down
Loading