diff --git a/docarray/array/document.py b/docarray/array/document.py index 7ebb0880f89..336e6b091cf 100644 --- a/docarray/array/document.py +++ b/docarray/array/document.py @@ -13,12 +13,14 @@ from docarray.array.elastic import DocumentArrayElastic from docarray.array.redis import DocumentArrayRedis from docarray.array.milvus import DocumentArrayMilvus + from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.storage.sqlite import SqliteConfig from docarray.array.storage.annlite import AnnliteConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.storage.elastic import ElasticConfig from docarray.array.storage.redis import RedisConfig from docarray.array.storage.milvus import MilvusConfig + from docarray.array.storage.opensearch import OpenSearchConfig class DocumentArray(AllMixins, BaseDocumentArray): @@ -150,6 +152,15 @@ def __new__( config: Optional[Union['MilvusConfig', Dict]] = None, ) -> 'DocumentArrayMilvus': """Create a Milvus-powered DocumentArray object.""" + + @overload + def __new__( + cls, + _docs: Optional['DocumentArraySourceType'] = None, + storage: str = 'opensearch', + config: Optional[Union['OpenSearchConfig', Dict]] = None, + ) -> 'DocumentArrayOpenSearch': + """Create an OpenSearch-powered DocumentArray object.""" ... def __enter__(self): @@ -200,6 +211,10 @@ def __new__(cls, *args, storage: str = 'memory', **kwargs): from .milvus import DocumentArrayMilvus instance = super().__new__(DocumentArrayMilvus) + elif storage == 'opensearch': + from docarray.array.opensearch import DocumentArrayOpenSearch + + instance = super().__new__(DocumentArrayOpenSearch) else: raise ValueError(f'storage=`{storage}` is not supported.') diff --git a/docarray/array/opensearch.py b/docarray/array/opensearch.py new file mode 100644 index 00000000000..e8d38273613 --- /dev/null +++ b/docarray/array/opensearch.py @@ -0,0 +1,19 @@ +from docarray.array.document import DocumentArray +from docarray.array.storage.opensearch import StorageMixins, OpenSearchConfig + +__all__ = ['DocumentArrayOpenSearch', 'OpenSearchConfig'] + + +class DocumentArrayOpenSearch(StorageMixins, DocumentArray): + """This is a :class:`DocumentArray` that uses OpenSearch as + vector search engine and storage. + """ + + def __new__(cls, *args, **kwargs): + """``__new__`` method for :class:`DocumentArrayOpenSearch` + + :param *args: list of args to instantiate the object + :param **kwargs: dict of args to instantiate the object + :return: the instantiated :class:`DocumentArrayOpenSearch` object + """ + return super().__new__(cls) diff --git a/docarray/array/storage/opensearch/__init__.py b/docarray/array/storage/opensearch/__init__.py new file mode 100644 index 00000000000..08ea08169a3 --- /dev/null +++ b/docarray/array/storage/opensearch/__init__.py @@ -0,0 +1,12 @@ +from abc import ABC + +from docarray.array.storage.opensearch.backend import BackendMixin, OpenSearchConfig +from docarray.array.storage.opensearch.find import FindMixin +from docarray.array.storage.opensearch.getsetdel import GetSetDelMixin +from docarray.array.storage.opensearch.seqlike import SequenceLikeMixin + +__all__ = ['StorageMixins', 'OpenSearchConfig'] + + +class StorageMixins(FindMixin, BackendMixin, GetSetDelMixin, SequenceLikeMixin, ABC): + ... diff --git a/docarray/array/storage/opensearch/backend.py b/docarray/array/storage/opensearch/backend.py new file mode 100644 index 00000000000..a96bd8bdb6c --- /dev/null +++ b/docarray/array/storage/opensearch/backend.py @@ -0,0 +1,312 @@ +import copy +import uuid +from typing import ( + Optional, + TYPE_CHECKING, + Union, + Dict, + List, + Mapping, + Any, + Tuple, + Iterable, +) +from dataclasses import dataclass, field + +from docarray.array.storage.base.backend import BaseBackendMixin, TypeMap +from docarray import Document +from docarray.helper import dataclass_from_dict, _safe_cast_int + +import numpy as np + +from opensearchpy import OpenSearch +from opensearchpy.helpers import parallel_bulk +import warnings + +if TYPE_CHECKING: + from docarray.typing import ( + DocumentArraySourceType, + ) + from docarray.typing import DocumentArraySourceType, ArrayType + + +@dataclass +class OpenSearchConfig: + n_dim: int # dims in opensearch + distance: str = 'cosinesimil' # similarity in opensearch + hosts: Union[ + str, List[Union[str, Mapping[str, Union[str, int]]]], None + ] = 'http://localhost:9900' + index_name: Optional[str] = None + list_like: bool = True + opensearch_config: Dict[str, Any] = field(default_factory=dict) + index_text: bool = False + tag_indices: List[str] = field(default_factory=list) + batch_size: int = 64 + ef_construction: Optional[int] = 512 + m: Optional[int] = 16 + columns: Optional[Union[List[Tuple[str, str]], Dict[str, str]]] = None + engine: str = 'nmslib' + ef_search: Optional[int] = None + encoder: Optional[str] = None + algorithm: str = 'hnsw' + root_id: bool = True + + +_banned_indexname_chars = ['[', ' ', '"', '*', '\\', '<', '|', ',', '>', '/', '?', ']'] + + +def _sanitize_index_name(name): + new_name = name + for char in _banned_indexname_chars: + new_name = new_name.replace(char, '') + return new_name + + +class BackendMixin(BaseBackendMixin): + TYPE_MAP = { + 'str': TypeMap(type='text', converter=str), + 'float': TypeMap(type='float', converter=float), + 'int': TypeMap(type='integer', converter=_safe_cast_int), + 'double': TypeMap(type='double', converter=float), + 'long': TypeMap(type='long', converter=_safe_cast_int), + 'bool': TypeMap(type='boolean', converter=bool), + } + + def _init_storage( + self, + _docs: Optional['DocumentArraySourceType'] = None, + config: Optional[Union[OpenSearchConfig, Dict]] = None, + **kwargs, + ): + config = copy.deepcopy(config) + if not config: + raise ValueError('Empty config is not allowed for OpenSearch storage') + elif isinstance(config, dict): + config = dataclass_from_dict(OpenSearchConfig, config) + + if config.index_name is None: + id = uuid.uuid4().hex + config.index_name = 'index_name__' + id + + self._index_name_offset2id = 'offset2id__' + config.index_name + self._config = config + + self._config.columns = self._normalize_columns(self._config.columns) + + self.n_dim = self._config.n_dim + self._client = self._build_client() + self._list_like = self._config.list_like + self._build_offset2id_index() + + # Note super()._init_storage() calls _load_offset2ids which calls _get_offset2ids_meta + super()._init_storage(**kwargs) + + if _docs is None: + return + elif isinstance(_docs, Iterable): + self.extend(_docs) + else: + if isinstance(_docs, Document): + self.append(_docs) + + def _ensure_unique_config( + self, + config_root: dict, + config_subindex: dict, + config_joined: dict, + subindex_name: str, + ) -> dict: + if 'index_name' not in config_subindex: + unique_index_name = _sanitize_index_name( + config_joined['index_name'] + '_subindex_' + subindex_name + ) + config_joined['index_name'] = unique_index_name + return config_joined + + def _build_offset2id_index(self): + if not self._client.indices.exists(index=self._index_name_offset2id): + self._client.indices.create(index=self._index_name_offset2id, ignore=[404]) + + def _get_offset2ids_meta(self) -> List: + """Return the offset2ids stored in opensearch + + :return: a list containing ids + + :raises ValueError: error is raised if index _client is not found or no offsets are found + """ + if not self._client: + raise ValueError('OpenSearch client does not exist') + + n_docs = self._client.count(index=self._index_name_offset2id)["count"] + + if n_docs != 0: + offsets = [x for x in range(n_docs)] + resp = self._client.mget( + index=self._index_name_offset2id, body={'ids': offsets} + ) + ids = [x['_source']['blob'] for x in resp['docs']] + return ids + else: + return [] + + def _build_client(self): + + client = OpenSearch( + hosts=self._config.hosts, + **self._config.opensearch_config, + ) + + schema = self._build_schema_from_opensearch_config(self._config) + + if not client.indices.exists(index=self._config.index_name): + client.indices.create( + index=self._config.index_name, + body={ + 'settings': { + 'index': {'knn': True, "knn.algo_param.ef_search": 100} + }, + 'mappings': schema['mappings'], + }, + ) + + client.indices.refresh(index=self._config.index_name) + return client + + def _build_schema_from_opensearch_config(self, opensearch_config: OpenSearchConfig): + da_schema = { + 'mappings': { + 'dynamic': 'true', + '_source': {'enabled': 'true'}, + 'properties': { + 'embedding': { + 'type': 'knn_vector', + 'dimension': opensearch_config.n_dim, + 'method': { + 'name': opensearch_config.algorithm, + 'space_type': opensearch_config.distance, + 'engine': opensearch_config.engine, + 'parameters': { + # 'ef_search': opensearch_config.ef_search, TODO: add + 'ef_construction': opensearch_config.ef_construction, + 'm': opensearch_config.m, + # 'encoder': opensearch_config.encoder, TODO: add + }, + }, + }, + 'text': {'type': 'text', 'index': opensearch_config.index_text}, + }, + } + } + if opensearch_config.tag_indices: + for index in opensearch_config.tag_indices: + da_schema['mappings']['properties'][index] = { + 'type': 'text', + 'index': True, + } + + for col, coltype in self._config.columns.items(): + da_schema['mappings']['properties'][col] = { + 'type': self._map_type(coltype), + 'index': True, + } + + if self._config.m or self._config.ef_construction: + index_options = { + 'm': self._config.m or 16, + 'ef_construction': self._config.ef_construction or 512, + } + if self._config.ef_search: + index_options['ef_search'] = self._config.ef_search + if self._config.encoder: + index_options['encoder'] = self._config.encoder + da_schema['mappings']['properties']['embedding']['method'][ + 'parameters' + ] = index_options + return da_schema + + def _refresh(self, index_name): + self._client.indices.refresh(index=index_name) + + def _doc_id_exists(self, doc_id): + return self._client.exists(index=self._config.index_name, id=doc_id) + + def _send_requests(self, request, **kwargs) -> List[Dict]: + """Send bulk request to OpenSearch and gather the successful info""" + + # for backward compatibility + if 'chunk_size' not in kwargs: + kwargs['chunk_size'] = self._config.batch_size + + accumulated_info = [] + for success, info in parallel_bulk( + self._client, + request, + raise_on_error=False, + raise_on_exception=False, + **kwargs, + ): + if not success: + warnings.warn(str(info)) + else: + accumulated_info.append(info) + + return accumulated_info + + def _update_offset2ids_meta(self): + """Update the offset2ids in opensearch""" + if self._client.indices.exists(index=self._index_name_offset2id): + requests = [ + { + '_op_type': 'index', + '_id': offset_, # note offset goes here because it's what we want to get by + '_index': self._index_name_offset2id, + 'blob': f'{id_}', + } # id here + for offset_, id_ in enumerate(self._offset2ids.ids) + ] + self._send_requests(requests) + self._client.indices.refresh(index=self._index_name_offset2id) + + # Clean trailing unused offsets + offset_count = self._client.count(index=self._index_name_offset2id) + unused_offsets = range(len(self._offset2ids.ids), offset_count['count']) + + if len(unused_offsets) > 0: + requests = [ + { + '_op_type': 'delete', + '_id': offset_, # note offset goes here because it's what we want to get by + '_index': self._index_name_offset2id, + } + for offset_ in unused_offsets + ] + self._send_requests(requests) + self._client.indices.refresh(index=self._index_name_offset2id) + + def _map_embedding(self, embedding: 'ArrayType') -> List[float]: + from docarray.math.helper import EPSILON + + if embedding is None: + embedding = np.zeros(self.n_dim) + EPSILON + else: + from docarray.math.ndarray import to_numpy_array + + embedding = to_numpy_array(embedding) + + if embedding.ndim > 1: + embedding = np.asarray(embedding).squeeze() + + if np.all(embedding == 0): + embedding = embedding + EPSILON + + return embedding + + def __getstate__(self): + d = dict(self.__dict__) + del d['_client'] + return d + + def __setstate__(self, state): + self.__dict__ = state + self._client = self._build_client() diff --git a/docarray/array/storage/opensearch/find.py b/docarray/array/storage/opensearch/find.py new file mode 100644 index 00000000000..736df9678d5 --- /dev/null +++ b/docarray/array/storage/opensearch/find.py @@ -0,0 +1,197 @@ +from typing import ( + TYPE_CHECKING, + TypeVar, + Sequence, + List, + Union, + Optional, + Dict, +) + +import numpy as np + +from docarray import Document, DocumentArray +from docarray.math import ndarray +from docarray.math.helper import EPSILON +from docarray.math.ndarray import to_numpy_array +from docarray.score import NamedScore +from docarray.array.mixins.find import FindMixin as BaseFindMixin + + +if TYPE_CHECKING: # pragma: no cover + import tensorflow + import torch + + OpenSearchArrayType = TypeVar( + 'OpenSearchArrayType', + np.ndarray, + tensorflow.Tensor, + torch.Tensor, + Sequence[float], + Dict, + ) + + +class FindMixin(BaseFindMixin): + def _find_similar_vectors( + self, + query: 'OpenSearchArrayType', + filter: Optional[Dict] = None, + limit=10, + **kwargs, + ): + """ + Return vector search results for the input query. `script_score` will be used in filter_field is set. + :param query: query vector used for vector search + :param filter: filter query used for post-filtering + :param limit: number of items to be retrieved + :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing + the closest Document objects for each of the queries in `query`. + """ + query = to_numpy_array(query) + is_all_zero = np.all(query == 0) + if is_all_zero: + query = query + EPSILON + + filter_query = {'match_all': {}} + + if filter: + filter_query = {'bool': {'filter': filter}} + + knn_query = { + 'size': limit, + 'query': { + 'script_score': { + 'query': filter_query, + 'script': { + 'lang': 'knn', + 'source': 'knn_score', + 'params': { + 'field': 'embedding', + 'query_value': query, + 'space_type': self._get_distance_metric( + kwargs.get('distance') + ), + }, + }, + } + }, + } + + resp = self._client.search(index=self._config.index_name, body=knn_query) + list_of_hits = resp['hits']['hits'] + + da = DocumentArray() + for result in list_of_hits: + doc = Document.from_base64(result['_source']['blob']) + doc.scores['score'] = NamedScore(value=result['_score']) + doc.embedding = result['_source']['embedding'] + da.append(doc) + + return da + + def _get_distance_metric(self, distance=None): + return distance if distance else self._config.distance + + def _find_similar_documents_from_text( + self, + query: str, + index: str = 'text', + filter: Union[dict, list] = None, + limit: int = 10, + ): + """ + Return keyword matches for the input query + :param query: text used for keyword search + :param limit: number of items to be retrieved + :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing + the closest Document objects for each of the queries in `query`. + """ + + query = { + '_source': ['id', 'blob', 'text'], + 'size': limit, + 'query': { + "bool": { + "must": [ + {"match": {index: query}}, + ], + 'filter': filter, + } + }, + } + + resp = self._client.search(index=self._config.index_name, body=query) + list_of_hits = resp['hits']['hits'] + + da = DocumentArray() + for result in list_of_hits[:limit]: + doc = Document.from_base64(result['_source']['blob']) + doc.scores['score'] = NamedScore(value=result['_score']) + da.append(doc) + + return da + + def _find_by_text( + self, + query: Union[str, List[str]], + index: str = 'text', + filter: Union[dict, list] = None, + limit: int = 10, + ): + if isinstance(query, str): + query = [query] + + return [ + self._find_similar_documents_from_text( + q, + index=index, + filter=filter, + limit=limit, + ) + for q in query + ] + + def _find( + self, + query: 'OpenSearchArrayType', + limit: int = 10, + filter: Optional[Dict] = None, + **kwargs, + ) -> List['DocumentArray']: + """Returns approximate nearest neighbors given a batch of input queries. + :param query: input supported to be stored in OpenSearch. This includes any from the list '[np.ndarray, tensorflow.Tensor, torch.Tensor, Sequence[float]]' + :param limit: number of retrieved items + :param filter: filter query used for pre-filtering + :return: DocumentArray containing the closest documents to the query if it is a single query, otherwise a list of DocumentArrays containing + the closest Document objects for each of the queries in `query`. + """ + query = np.array(query).astype(np.float) + num_rows, n_dim = ndarray.get_array_rows(query) + if n_dim != 2: + query = query.reshape((num_rows, -1)) + + return [ + self._find_similar_vectors(q, filter=filter, limit=limit, **kwargs) + for q in query + ] + + def _find_with_filter(self, query: Dict, limit: Optional[Union[int, float]] = 20): + resp = self._client.search( + index=self._config.index_name, body={'query': query, 'size': limit} + ) + list_of_hits = resp['hits']['hits'] + + da = DocumentArray() + for result in list_of_hits[:limit]: + doc = Document.from_base64(result['_source']['blob']) + doc.scores['score'] = NamedScore(value=result['_score']) + da.append(doc) + + return da + + def _filter( + self, query: Dict, limit: Optional[Union[int, float]] = 20 + ) -> 'DocumentArray': + + return self._find_with_filter(query, limit=limit) diff --git a/docarray/array/storage/opensearch/getsetdel.py b/docarray/array/storage/opensearch/getsetdel.py new file mode 100644 index 00000000000..82a0b7636af --- /dev/null +++ b/docarray/array/storage/opensearch/getsetdel.py @@ -0,0 +1,123 @@ +from typing import Sequence, Iterable, Dict, List + +from docarray.array.storage.base.getsetdel import BaseGetSetDelMixin +from docarray.array.storage.base.helper import Offset2ID +from docarray import Document +import numpy as np + + +class GetSetDelMixin(BaseGetSetDelMixin): + + MAX_OPENSEARCH_RETURNED_DOCS = 10000 + + def _getitem(self, doc_id: str) -> 'Document': + """Helper method for getting item with OpenSearch as storage + :param doc_id: id of the document + :raises KeyError: raise error when opensearch id does not exist in storage + :return: Document + """ + try: + result = self._client.get(index=self._config.index_name, id=doc_id) + doc = Document.from_base64(result['_source']['blob']) + return doc + except Exception as ex: + raise KeyError(doc_id) from ex + + def _get_doc_by_id(self, _id: str) -> 'Document': + """Concrete implementation of base class' ``_get_doc_by_id`` + :param _id: the id of the document + :return: the retrieved document from opensearch + """ + return self._getitem(_id) + + def _get_docs_by_ids(self, ids: Sequence[str]) -> Iterable[Document]: + """Concrete implementation of base class' ``_get_docs_by_ids`` + :param ids: ids of the document + :return: Iterable[Document] + """ + accumulated_docs = [] + accumulated_docs_id_not_found = [] + + if not ids: + return accumulated_docs + + # Handle if doc len is more than MAX_ES_RETURNED_DOCS + for pos in range(0, len(ids), self.MAX_OPENSEARCH_RETURNED_DOCS): + es_docs = self._client.mget( + body={'ids': ids[pos : pos + self.MAX_OPENSEARCH_RETURNED_DOCS]}, + index=self._config.index_name, + )['docs'] + for doc in es_docs: + if doc['found']: + accumulated_docs.append( + Document.from_base64(doc['_source']['blob']) + ) + else: + accumulated_docs_id_not_found.append(doc['_id']) + + if accumulated_docs_id_not_found: + raise KeyError(accumulated_docs_id_not_found, accumulated_docs) + + return accumulated_docs + + def _del_doc_by_id(self, _id: str): + """Concrete implementation of base class' ``_del_doc_by_id`` + :param _id: the id of the document to delete + """ + if self._doc_id_exists(_id): + self._client.delete(index=self._config.index_name, id=_id) + self._refresh(self._config.index_name) + + def _set_doc_by_id(self, _id: str, value: Document): + """Concrete implementation of base class' ``_set_doc_by_id`` + :param _id: the id of doc to update + :param value: the document to update to + """ + if _id != value.id: + self._del_doc_by_id(_id) + + request = [self._document_to_opensearch_request(value)] + + self._send_requests(request) + self._refresh(self._config.index_name) + + def _set_docs_by_ids(self, ids, docs: Iterable[Document], mismatch_ids: Dict): + """Overridden implementation of _set_docs_by_ids in order to add docs in batches and flush at the end + :param ids: the ids used for indexing + """ + for _id, doc in zip(ids, docs): + self._set_doc_by_id(_id, doc) + + self._refresh(self._config.index_name) + + def _load_offset2ids(self): + if self._list_like: + ids = self._get_offset2ids_meta() + self._offset2ids = Offset2ID(ids, list_like=self._list_like) + else: + self._offset2ids = Offset2ID([], list_like=self._list_like) + + def _save_offset2ids(self): + if self._list_like: + self._update_offset2ids_meta() + + def _document_to_opensearch_request(self, doc: Document) -> Dict: + extra_columns = { + col: doc.tags.get(col) for col, _ in self._config.columns.items() + } + request = { + '_op_type': 'index', + '_id': doc.id, + '_index': self._config.index_name, + 'embedding': self._map_embedding(doc.embedding), + 'blob': doc.to_base64(), + **extra_columns, + } + + if self._config.tag_indices: + for index in self._config.tag_indices: + request[index] = doc.tags.get(index) + + if doc.text: + request['text'] = doc.text + return request diff --git a/docarray/array/storage/opensearch/seqlike.py b/docarray/array/storage/opensearch/seqlike.py new file mode 100644 index 00000000000..33f834e291c --- /dev/null +++ b/docarray/array/storage/opensearch/seqlike.py @@ -0,0 +1,102 @@ +from typing import Iterable, Iterator, Union, TYPE_CHECKING, List, Dict +from docarray.array.storage.base.seqlike import BaseSequenceLikeMixin +import warnings + +# if TYPE_CHECKING: +from docarray import Document + + +class SequenceLikeMixin(BaseSequenceLikeMixin): + """Implement sequence-like methods for DocumentArray with OpenSearch as storage""" + + def __eq__(self, other): + """Compare this object to the other, returns True if and only if other + as the same type as self and other has the same meta information + :param other: the other object to check for equality + :return: ``True`` if other is equal to self + """ + # two DAW are considered as the same if they have the same client meta data + return ( + type(self) is type(other) + and self._client.get_meta() == other._client.get_meta() + and self._config == other._config + ) + + def __len__(self): + """Return the length of :class:`DocumentArray` that uses OpenSearch as storage + :return: the length of this :class:`DocumentArrayOpenSearch` object + """ + try: + return self._client.count(index=self._config.index_name)["count"] + except: + return 0 + + def __contains__(self, x: Union[str, 'Document']): + """Check if ``x`` is contained in this :class:`DocumentArray` with OpenSearch storage + :param x: the id of the document to check or the document object itself + :return: True if ``x`` is contained in self + """ + if isinstance(x, str): + return self._doc_id_exists(x) + elif isinstance(x, Document): + return self._doc_id_exists(x.id) + else: + return False + + def __repr__(self): + """Return the string representation of :class:`DocumentArrayOpenSearch` object + :return: string representation of this object + """ + return f'<{self.__class__.__name__} (length={len(self)}) at {id(self)}>' + + @staticmethod + def _parse_index_ids_from_bulk_info( + accumulated_info: List[Dict], + ) -> Dict[str, List[int]]: + """Parse ids from bulk info of failed send request to OpenSearch operation + :param accumulated_info: accumulated info of failed operation + :return: dict containing failed index ids of each operation type + """ + + parsed_ids = {} + + for info in accumulated_info: + for _op_type in info.keys(): + if '_id' in info[_op_type]: + if _op_type not in parsed_ids: + parsed_ids[_op_type] = [] + + parsed_ids[_op_type].append(info[_op_type]['_id']) + + return parsed_ids + + def _upload_batch(self, docs: Iterable['Document'], **kwargs) -> List[int]: + requests = [self._document_to_opensearch_request(doc) for doc in docs] + accumulated_info = self._send_requests(requests, **kwargs) + self._refresh(self._config.index_name) + + successful_ids = self._parse_index_ids_from_bulk_info(accumulated_info) + if 'index' not in successful_ids: + return [] + + return successful_ids['index'] + + def _extend(self, docs: Iterable['Document'], **kwargs): + docs = list(docs) + successful_indexed_ids = self._upload_batch(docs, **kwargs) + if self._list_like: + self._offset2ids.extend( + [ + _id + for _id in successful_indexed_ids + if _id not in self._offset2ids.ids + ] + ) + + if len(successful_indexed_ids) != len(docs): + doc_ids = [doc.id for doc in docs] + failed_index_ids = set(doc_ids) - set(successful_indexed_ids) + + err_msg = f'fail to add Documents with ids: {failed_index_ids}' + warnings.warn(err_msg) + raise IndexError(err_msg) diff --git a/docs/advanced/document-store/extend.md b/docs/advanced/document-store/extend.md index 591d2ce8832..1a815593edd 100644 --- a/docs/advanced/document-store/extend.md +++ b/docs/advanced/document-store/extend.md @@ -279,7 +279,7 @@ class FindMixin: ... def _find( - self, query: 'ElasticArrayType', limit: int = 10, **kwargs + self, query: 'OpenSearchArrayType', limit: int = 10, **kwargs ) -> Union['DocumentArray', List['DocumentArray']]: """Returns `limit` approximate nearest neighbors given a batch of input queries. If the query is a single query, should return a DocumentArray, otherwise a list of DocumentArrays containing diff --git a/docs/advanced/document-store/opensearch.md b/docs/advanced/document-store/opensearch.md new file mode 100644 index 00000000000..6347c5ba0d2 --- /dev/null +++ b/docs/advanced/document-store/opensearch.md @@ -0,0 +1,412 @@ +(opensearch)= + +# Opensearch + +You can use [Opensearch](https://opensearch.org/) as the document store for DocumentArray. It is useful when you wants to have faster Document retrieval on embeddings, i.e. `.match()`, `.find()`. + +````{tip} +This feature requires `opensearch`. You can install it via `pip install "docarray[opensearch]".` +```` + +## Usage + +### Start Opensearch service + +To use Opensearch as the storage backend, it is required to have the Opensearch service started. Create `docker-compose.yml` as follows: + +```yaml +version: "3.3" +services: + elastic: + image: opensearchproject/opensearch:2.4.0 + environment: + - xpack.security.enabled=false + - discovery.type=single-node + ports: + - "9200:9200" + networks: + - os + +networks: + elastic: + name: os +``` + +Then + +```bash +pip install -U docarray[opensearch] +docker-compose up +``` + +### Create DocumentArray with Opensearch backend + +Assuming service is started using the default configuration (i.e. server address is `http://localhost:9200`), you can instantiate a DocumentArray with Opensearch storage as such: + +```python +from docarray import DocumentArray + +da = DocumentArray(storage='opensearch', config={'n_dim': 128}) +``` + +The usage would be the same as the ordinary DocumentArray, but the dimension of an embedding for a Document must be provided at creation time. + +### Secure connection + +By default, Opensearch server runs with security layer that disables the plain HTTP connection. You can pass the `host` with `api_id` or `ca_certs` inside `opensearch_config` to the constructor. For example, + +```python +from docarray import DocumentArray + +da = DocumentArray( + storage='opensearch', + config={ + 'hosts': 'https://opensearch:PRq7je_hJ4i4auh+Hq+*@localhost:9200', + 'n_dim': 128, + 'opensearch_config': {'ca_certs': '/Users/hanxiao/http_ca.crt'}, + }, +) +``` + +Here is [the official Documentation](https://opensearch.org/docs/2.0/security-plugin/configuration/generate-certificates/) for you to get certificate, password etc. + +To access a DocumentArray formerly persisted, you can specify `index_name` and the hosts. + +The following example will build a DocumentArray with previously stored data from `old_stuff` on `http://localhost:9200`: + +```python +from docarray import DocumentArray, Document + +da = DocumentArray( + storage='opensearch', + config={'index_name': 'old_stuff', 'n_dim': 128}, +) + +with da: + da.extend([Document() for _ in range(1000)]) + +da2 = DocumentArray( + storage='opensearch', + config={'index_name': 'old_stuff', 'n_dim': 128}, +) + +da2.summary() +``` + +```text + Documents Summary + + Length 2000 + Homogenous Documents True + Common Attributes ('id', 'embedding') + + Attributes Summary + + Attribute Data type #Unique values Has empty value + ───────────────────────────────────────────────────────────── + embedding ('ndarray',) 1000 False + id ('str',) 1000 False + + Storage Summary + + Backend OpenSearch + Host http://localhost:9200 + Distance cosine + Vector dimension 128 + ES config {} + +[0.14890289 0.3168339 0.03050802 0.06785086 0.94719299 0.32490566 + ...] +``` + +Other functions behave the same as in-memory DocumentArray. + +### Bulk request customization + +You can customize how bulk requests are sent to Opensearch when adding Documents by adding additional `kwargs` on `extend` method call. See [the official documentation](https://opensearch.org/docs/1.2/opensearch/rest-api/document-apis/bulk/) for more details. See the following code for example: + +```python +from docarray import Document, DocumentArray +import numpy as np + +n_dim = 3 + +da = DocumentArray( + storage='opensearch', + config={'n_dim': 3, 'columns': {'price': 'int'}, 'distance': 'l2'}, +) + +with da: + da.extend( + [ + Document(id=f'r{i}', embedding=i * np.ones(n_dim), tags={'price': i}) + for i in range(10) + ], + thread_count=4, + chunk_size=500, + max_chunk_bytes=104857600, + queue_size=4, + ) +``` + +````{admonition} Note +:class: note +`batch_size` configuration will be overriden by `chunk_size` kwargs if provided +```` + +```{tip} +You can read more about parallel bulk config and their default values [here](https://github.com/opensearch-project/opensearch-py/blob/main/opensearchpy/helpers/actions.py) +``` + +### Vector search with filter query + +You can perform Approximate Nearest Neighbor Search and pre-filter results using a filter query . + +Consider Documents with embeddings `[0,0,0]` up to `[9,9,9]` where the Document with embedding `[i,i,i]` +has as tag `price` with value `i`. We can create such example with the following code: + +```python +from docarray import Document, DocumentArray +import numpy as np + +n_dim = 3 + +da = DocumentArray( + storage='opensearch', + config={'n_dim': n_dim, 'columns': {'price': 'int'}, 'distance': 'l2'}, +) + +with da: + da.extend( + [ + Document(id=f'r{i}', embedding=i * np.ones(n_dim), tags={'price': i}) + for i in range(10) + ] + ) + +print('\nIndexed Prices:\n') +for embedding, price in zip(da.embeddings, da[:, 'tags__price']): + print(f'\tembedding={embedding},\t price={price}') +``` + +Consider we want the nearest vectors to the embedding `[8. 8. 8.]`, with the restriction that +prices must follow a filter. As an example, let's consider that retrieved Documents must have `price` value lower +or equal than `max_price`. We can encode this information in OpenSearch using `filter = {'range': {'price': {'lte': max_price}}}`. + +Then the search with the proposed filter can be implemented and used with the following code: + +```python +max_price = 7 +n_limit = 4 + +np_query = np.ones(n_dim) * 8 +print(f'\nQuery vector: \t{np_query}') + +filter = {'range': {'price': {'lte': max_price}}} +results = da.find(np_query, filter=filter, limit=n_limit) + +print('\nEmbeddings Nearest Neighbours with "price" at most 7:\n') +for embedding, price in zip(results.embeddings, results[:, 'tags__price']): + print(f'\tembedding={embedding},\t price={price}') +``` + +This would print: + +```bash +Embeddings Nearest Neighbours with "price" at most 7: + + embedding=[7. 7. 7.], price=7 + embedding=[6. 6. 6.], price=6 + embedding=[5. 5. 5.], price=5 + embedding=[4. 4. 4.], price=4 + ``` + +Additionally you can tune the approximate kNN for speed or accuracy by providing `num_candidates` kwarg when calling the `find` method: + +```python +results = da.find(np_query, filter=filter, limit=n_limit, num_candidates=100) +``` + +```{tip} +You can read more about approximate kNN tuning [here](https://opensearch.org/docs/latest/search-plugins/knn/index/) +``` + +### Search by filter query + +You can search with user-defined query filters using the `.find` method. Such queries can be constructed following the +guidelines in [OpenSearch's Documentation](https://opensearch.org/docs/latest/search-plugins/knn/knn-score-script/). + +Consider you store Documents with a certain tag `price` into OpenSearch and you want to retrieve all Documents +with `price` lower or equal to some `max_price` value. + +You can index such Documents as follows: + +```python +from docarray import Document, DocumentArray + +n_dim = 3 +da = DocumentArray( + storage='opensearch', + config={ + 'n_dim': n_dim, + 'columns': {'price': 'float'}, + }, +) + +with da: + da.extend([Document(id=f'r{i}', tags={'price': i}) for i in range(10)]) + +print('\nIndexed Prices:\n') +for price in da[:, 'tags__price']: + print(f'\t price={price}') +``` + +Then you can retrieve all Documents whose price is lower than or equal to `max_price` by applying the following +filter: + +```python +max_price = 3 +n_limit = 4 + +filter = { + 'range': { + 'price': { + 'lte': max_price, + } + } +} +results = da.find(filter=filter) + +print('\n Returned examples that verify filter "price at most 3":\n') +for price in results[:, 'tags__price']: + print(f'\t price={price}') +``` + +This would print + +``` + Returned examples that satisfy condition "price at most 3": + + price=0 + price=1 + price=2 + price=3 +``` + +### Search by `.text` field + +Text search can be easily leveraged in a `DocumentArray` with `storage='opensearch'`. +To do this text needs to be indexed using the boolean flag `'index_text'` which is set when +the `DocumentArray` is created with `config={'index_text': True, ...}`. +The following example builds a `DocumentArray` with several Documents containing text and searches +for those that have `pizza` in their text description. + +```python +from docarray import DocumentArray, Document + +da = DocumentArray(storage='opensearch', config={'n_dim': 2, 'index_text': True}) +with da: + da.extend( + [ + Document(text='Person eating'), + Document(text='Person eating pizza'), + Document(text='Pizza restaurant'), + ] + ) + +pizza_docs = da.find('pizza') +pizza_docs[:, 'text'] +``` + +will print + +```text +['Pizza restaurant', 'Person eating pizza'] +``` + +### Search by `.tags` field + +Text can also be indexed when it is part of `tags`. +This is mostly useful in applications where text data can be split into groups and applications might require +retrieving items based on a text search in an specific tag. + +For example: + +```python +from docarray import DocumentArray, Document + +da = DocumentArray( + storage='opensearch', config={'n_dim': 32, 'tag_indices': ['food_type', 'price']} +) +with da: + da.extend( + [ + Document( + tags={ + 'food_type': 'Italian and Spanish food', + 'price': 'cheap but not that cheap', + }, + ), + Document( + tags={ + 'food_type': 'French and Italian food', + 'price': 'on the expensive side', + }, + ), + Document( + tags={ + 'food_type': 'chinese noddles', + 'price': 'quite cheap for what you get!', + }, + ), + ] + ) + +results_cheap = da.find('cheap', index='price') +print('searching "cheap" in :\n\t', results_cheap[:, 'tags__price']) + +results_italian = da.find('italian', index='food_type') +print('searching "italian" in :\n\t', results_italian[:, 'tags__food_type']) +``` + +will print + +```text +searching "cheap" in : + ['cheap but not that cheap', 'quite cheap for what you get!'] +searching "italian" in : + ['Italian and Spanish food', 'French and Italian food'] +``` + +````{admonition} Note +:class: note +By default, if you don't specify the parameter `index` in the `find` method, the Document attribute `text` will be used +for search. If you want to use a specific tags field, make sure to specify it with parameter `index`: +```python +results = da.find('cheap', index='price') +``` +```` + +## Configuration + +| Name | Description | Default | +|---------------------|----------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------| +| `hosts` | Hostname of the Opensearch server | `http://localhost:9200` | +| `opensearch_config` | Other OpenSearch configs in a Dict and pass to `Opensearch` client constructor, e.g. `cloud_id`, `api_key` | None | +| `index_name` | Opensearch index name; the class name of Opensearch index object to set this DocumentArray | None | +| `n_dim` | Dimensionality of the embeddings | None | +| `distance` | Similarity metric in Opensearch | `cosinesimil` | +| `ef_construction` | The size of the dynamic list for the nearest neighbors. | `None`, defaults to the default value in OpenSearch* | +| `m` | Similarity metric in Opensearch | `None`, defaults to the default value OpenSearch* | +| `index_text` | Boolean flag indicating whether to index `.text` or not | False | +| `tag_indices` | List of tags to index | False | +| `batch_size` | Batch size used to handle storage refreshes/updates | 64 | +| `list_like` | Controls if ordering of Documents is persisted in the Database. Disabling this breaks list-like features, but can improve performance. | True | + +```{tip} +You can read more about HNSW parameters and their default values [here](https://opensearch.org/docs/latest/search-plugins/knn/knn-index/) +``` + +```{tip} +Note that it is plural `hosts` not `host`, to comply with Opensearch client's interface. +``` diff --git a/setup.py b/setup.py index 9e1daca7f94..63ce1795748 100644 --- a/setup.py +++ b/setup.py @@ -84,6 +84,7 @@ 'milvus': [ 'pymilvus~=2.1.0', ], + 'opensearch': ['opensearch-py==2.0.1'], 'benchmark': [ 'pandas', 'matplotlib', @@ -115,6 +116,7 @@ 'elasticsearch>=8.2.0', 'redis>=4.3.0', 'pymilvus==2.1.3', + 'opensearch-py==2.0.1', 'jina', 'pytest-mock', ], diff --git a/tests/unit/array/docker-compose.yml b/tests/unit/array/docker-compose.yml index 0c384989043..cb21be2ff8a 100644 --- a/tests/unit/array/docker-compose.yml +++ b/tests/unit/array/docker-compose.yml @@ -31,8 +31,19 @@ services: image: redislabs/redisearch:2.6.0 ports: - "6379:6379" + opensearch: + image: opensearchproject/opensearch:2.4.0 + environment: + - plugins.security.disabled=true + - discovery.type=single-node + ports: + - "9900:9200" + networks: + - os networks: elastic: - name: elastic \ No newline at end of file + name: elastic + os: + name: os \ No newline at end of file diff --git a/tests/unit/array/mixins/test_content.py b/tests/unit/array/mixins/test_content.py index 362d0c488e9..cb7f0ddb7b6 100644 --- a/tests/unit/array/mixins/test_content.py +++ b/tests/unit/array/mixins/test_content.py @@ -11,6 +11,7 @@ from docarray.array.elastic import DocumentArrayElastic, ElasticConfig from docarray.array.redis import DocumentArrayRedis, RedisConfig from docarray.array.milvus import DocumentArrayMilvus, MilvusConfig +from docarray.array.opensearch import DocumentArrayOpenSearch, OpenSearchConfig @pytest.mark.parametrize( @@ -24,6 +25,7 @@ DocumentArrayElastic, DocumentArrayRedis, DocumentArrayMilvus, + DocumentArrayOpenSearch, ], ) @pytest.mark.parametrize( @@ -37,6 +39,7 @@ def test_content_empty_getter_return_none(cls, content_attr, start_storage): DocumentArrayElastic, DocumentArrayRedis, DocumentArrayMilvus, + DocumentArrayOpenSearch, ]: da = cls(config={'n_dim': 3}) else: @@ -55,6 +58,7 @@ def test_content_empty_getter_return_none(cls, content_attr, start_storage): DocumentArrayElastic, DocumentArrayRedis, DocumentArrayMilvus, + DocumentArrayOpenSearch, ], ) @pytest.mark.parametrize( @@ -75,6 +79,7 @@ def test_content_empty_setter(cls, content_attr, start_storage): DocumentArrayElastic, DocumentArrayRedis, DocumentArrayMilvus, + DocumentArrayOpenSearch, ]: da = cls(config={'n_dim': 3}) else: @@ -94,6 +99,7 @@ def test_content_empty_setter(cls, content_attr, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) @pytest.mark.parametrize( @@ -130,6 +136,7 @@ def test_content_getter_setter(cls, content_attr, config, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) def test_content_empty(da_len, da_cls, config, start_storage): @@ -169,6 +176,7 @@ def test_content_empty(da_len, da_cls, config, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=5)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=5)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=5)), ], ) def test_embeddings_setter(da_len, da_cls, config, start_storage): diff --git a/tests/unit/array/mixins/test_del.py b/tests/unit/array/mixins/test_del.py index aee552af9b4..7d34f99ec7f 100644 --- a/tests/unit/array/mixins/test_del.py +++ b/tests/unit/array/mixins/test_del.py @@ -117,6 +117,7 @@ def test_del_da_attribute(): ('annlite', {'n_dim': 3, 'metric': 'Euclidean'}), ('qdrant', {'n_dim': 3, 'distance': 'euclidean'}), ('elasticsearch', {'n_dim': 3, 'distance': 'l2_norm'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), diff --git a/tests/unit/array/mixins/test_empty.py b/tests/unit/array/mixins/test_empty.py index c92937cf80d..3030eaefbd6 100644 --- a/tests/unit/array/mixins/test_empty.py +++ b/tests/unit/array/mixins/test_empty.py @@ -1,6 +1,7 @@ import pytest from docarray import DocumentArray +from docarray.array.opensearch import DocumentArrayOpenSearch, OpenSearchConfig from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig @@ -23,6 +24,7 @@ (DocumentArrayElastic, ElasticConfig(n_dim=5)), (DocumentArrayRedis, RedisConfig(n_dim=5)), (DocumentArrayMilvus, MilvusConfig(n_dim=5)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=5)), ], ) def test_empty_non_zero(da_cls, config, start_storage): diff --git a/tests/unit/array/mixins/test_find.py b/tests/unit/array/mixins/test_find.py index e8ea1e225ac..9a7a05df0d0 100644 --- a/tests/unit/array/mixins/test_find.py +++ b/tests/unit/array/mixins/test_find.py @@ -33,6 +33,7 @@ def inv_cosine(*args): ('annlite', {'n_dim': 32}), ('qdrant', {'n_dim': 32}), ('elasticsearch', {'n_dim': 32}), + ('opensearch', {'n_dim': 32}), ('redis', {'n_dim': 32}), ('milvus', {'n_dim': 32}), ], @@ -87,6 +88,10 @@ def test_find(storage, config, limit, query, start_storage): cosine_similarities = [t['score'].value for t in result[:, 'scores']] assert sorted(cosine_similarities, reverse=True) == cosine_similarities assert len(cosine_similarities) == limit + elif storage == 'opensearch': + cosine_similarities = [t['score'].value for t in result[:, 'scores']] + assert sorted(cosine_similarities, reverse=True) == cosine_similarities + assert len(cosine_similarities) == limit elif storage == 'redis': cosine_distances = [t['score'].value for t in result[:, 'scores']] assert sorted(cosine_distances, reverse=False) == cosine_distances @@ -113,6 +118,11 @@ def test_find(storage, config, limit, query, start_storage): cosine_similarities = [t['score'].value for t in da[:, 'scores']] assert sorted(cosine_similarities, reverse=True) == cosine_similarities assert len(cosine_similarities) == limit + elif storage == 'opensearch': + for da in result: + cosine_similarities = [t['score'].value for t in da[:, 'scores']] + assert sorted(cosine_similarities, reverse=True) == cosine_similarities + assert len(cosine_similarities) == limit elif storage == 'redis': for da in result: cosine_distances = [t['score'].value for t in da[:, 'scores']] @@ -129,6 +139,7 @@ def test_find(storage, config, limit, query, start_storage): 'storage, config', [ ('elasticsearch', {'n_dim': 32, 'index_text': True}), + ('opensearch', {'n_dim': 32, 'index_text': True}), ('redis', {'n_dim': 32, 'index_text': True}), ], ) @@ -202,6 +213,35 @@ def test_find_by_text(storage, config, start_storage): } ], ), + ( + 'opensearch', + {'n_dim': 32, 'columns': {'i': 'int'}, 'index_text': True}, + None, + ), + ( + 'opensearch', + {'n_dim': 32, 'columns': {'i': 'int'}, 'index_text': True}, + { + 'range': { + 'i': { + 'lte': 5, + } + } + }, + ), + ( + 'opensearch', + {'n_dim': 32, 'columns': {'i': 'int'}, 'index_text': True}, + [ + { + 'range': { + 'i': { + 'lte': 5, + } + } + } + ], + ), ('redis', {'n_dim': 32, 'columns': {'i': 'int'}, 'index_text': True}, None), ( 'redis', @@ -235,6 +275,7 @@ def test_find_by_text_and_filter(storage, config, filter, start_storage): 'storage, config', [ ('elasticsearch', {'n_dim': 32, 'tag_indices': ['attr1', 'attr2', 'attr3']}), + ('opensearch', {'n_dim': 32, 'tag_indices': ['attr1', 'attr2', 'attr3']}), ( 'redis', {'n_dim': 32, 'tag_indices': ['attr1', 'attr2', 'attr3']}, @@ -355,6 +396,15 @@ def test_find_by_tag(storage, config, start_storage): 'eq': operator.eq, } +numeric_operators_opensearch = { + 'gte': operator.ge, + 'gt': operator.gt, + 'lte': operator.le, + 'lt': operator.lt, + 'eq': operator.eq, +} + + numeric_operators_redis = { 'gte': operator.ge, 'gt': operator.gt, @@ -454,6 +504,23 @@ def test_find_by_tag(storage, config, start_storage): ) for operator in ['gt', 'gte', 'lt', 'lte'] ], + *[ + tuple( + [ + 'opensearch', + lambda operator, threshold: { + 'range': { + 'price': { + operator: threshold, + } + } + }, + numeric_operators_opensearch, + operator, + ] + ) + for operator in ['gt', 'gte', 'lt', 'lte'] + ], *[ ( 'redis', @@ -579,6 +646,34 @@ def test_search_pre_filtering( ) for operator in ['gt', 'gte', 'lt', 'lte'] ], + *[ + tuple( + [ + 'opensearch', + lambda operator, threshold: {'match': {'price': threshold}}, + numeric_operators_elasticsearch, + operator, + ] + ) + for operator in ['eq'] + ], + *[ + tuple( + [ + 'opensearch', + lambda operator, threshold: { + 'range': { + 'price': { + operator: threshold, + } + } + }, + numeric_operators_elasticsearch, + operator, + ] + ) + for operator in ['gt', 'gte', 'lt', 'lte'] + ], *[ tuple( [ @@ -848,10 +943,11 @@ def test_unsupported_pre_filtering(storage, start_storage, columns): 'storage, config', [ ('elasticsearch', {'n_dim': 32, 'index_text': False}), + ('opensearch', {'n_dim': 32, 'index_text': False}), ], ) @pytest.mark.parametrize('limit', [1, 5, 10]) -def test_elastic_id_filter(storage, config, limit): +def test_elastic_os_id_filter(storage, config, limit): da = DocumentArray(storage=storage, config=config) da.extend([Document(id=f'{i}', embedding=np.random.rand(32)) for i in range(50)]) id_list = [np.random.choice(50, 10, replace=False) for _ in range(3)] @@ -874,6 +970,7 @@ def test_elastic_id_filter(storage, config, limit): ('annlite', {'n_dim': 3, 'metric': 'Euclidean'}), ('qdrant', {'n_dim': 3, 'distance': 'euclidean'}), ('elasticsearch', {'n_dim': 3, 'distance': 'l2_norm'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), @@ -889,6 +986,7 @@ def test_find_subindex(storage, config, start_storage): 'annlite', 'qdrant', 'elasticsearch', + 'opensearch', 'redis', 'milvus', ]: @@ -920,6 +1018,8 @@ def test_find_subindex(storage, config, start_storage): else: closest_docs = da.find(query=np.array([3, 3]), on='@c') + print('CLOSEST', closest_docs[0]) + b = closest_docs[0].embedding == [2, 2] if isinstance(b, bool): assert b @@ -937,6 +1037,7 @@ def test_find_subindex(storage, config, start_storage): ('annlite', {'n_dim': 3, 'metric': 'Euclidean'}), ('qdrant', {'n_dim': 3, 'distance': 'euclidean'}), ('elasticsearch', {'n_dim': 3, 'distance': 'l2_norm'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), @@ -1028,6 +1129,7 @@ class MMDoc: ('sqlite', dict(), {'@c': dict()}), ('qdrant', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('elasticsearch', {'n_dim': 3}, {'@c': {'n_dim': 3}}), + ('opensearch', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('redis', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('milvus', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ], @@ -1092,6 +1194,7 @@ def test_find_return_root(storage, config, subindex_configs, start_storage): ('sqlite', dict(), {'@c': dict()}), ('qdrant', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('elasticsearch', {'n_dim': 3}, {'@c': {'n_dim': 3}}), + ('opensearch', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('redis', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ('milvus', {'n_dim': 3}, {'@c': {'n_dim': 3}}), ], diff --git a/tests/unit/array/mixins/test_io.py b/tests/unit/array/mixins/test_io.py index 66f22021121..910f2288c11 100644 --- a/tests/unit/array/mixins/test_io.py +++ b/tests/unit/array/mixins/test_io.py @@ -6,9 +6,11 @@ from docarray import Document, DocumentArray from docarray.array.memory import DocumentArrayInMemory +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -36,6 +38,7 @@ def docs(): (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=10)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=10)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=10)), ], @@ -80,6 +83,7 @@ def test_document_save_load( (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=10)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=10)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=10)), ], @@ -101,6 +105,7 @@ def test_da_csv_write(docs, flatten_tags, tmp_path, da_cls, config, start_storag (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=256)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=256)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=256)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=256)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=256)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=256)), ], @@ -120,6 +125,7 @@ def test_from_ndarray(da_cls, config, start_storage): (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=256)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=256)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=256)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=256)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=256)), # (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=256)), # fails on CI but nowhere else ], @@ -162,6 +168,7 @@ def test_from_files_exclude(): (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=256)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=256)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=256)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=256)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=256)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=256)), ], @@ -183,6 +190,7 @@ def test_from_ndjson(da_cls, config, start_storage): (DocumentArrayElastic, lambda: ElasticConfig(n_dim=3)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=3)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=3)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=3)), ], ) def test_from_to_pd_dataframe(da_cls, config, start_storage): @@ -213,6 +221,7 @@ def test_from_to_pd_dataframe(da_cls, config, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=3)), (DocumentArrayRedis, RedisConfig(n_dim=3)), (DocumentArrayMilvus, MilvusConfig(n_dim=3)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=3)), ], ) def test_from_to_bytes(da_cls, config, start_storage): @@ -257,6 +266,7 @@ def test_from_to_bytes(da_cls, config, start_storage): (DocumentArrayElastic, lambda: ElasticConfig(n_dim=256)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=256)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=256)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=256)), ], ) def test_push_pull_io(da_cls, config, show_progress, start_storage): @@ -296,6 +306,7 @@ def test_push_pull_io(da_cls, config, show_progress, start_storage): (DocumentArrayInMemory, None), (DocumentArraySqlite, None), (DocumentArrayWeaviate, WeaviateConfig(n_dim=3)), + # (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), # (DocumentArrayAnnlite, PqliteConfig(n_dim=3)), # TODO: enable this # (DocumentArrayQdrant, QdrantConfig(n_dim=3)), # (DocumentArrayElastic, ElasticConfig(n_dim=3)), # Elastic needs config diff --git a/tests/unit/array/mixins/test_magic.py b/tests/unit/array/mixins/test_magic.py index bd8f813813e..bf6595b0433 100644 --- a/tests/unit/array/mixins/test_magic.py +++ b/tests/unit/array/mixins/test_magic.py @@ -1,9 +1,11 @@ import pytest from docarray import DocumentArray, Document +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -34,6 +36,7 @@ def docs(): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=1)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -62,6 +65,7 @@ def test_iter_len_bool(da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -83,6 +87,7 @@ def test_repr(da_cls, config, start_storage): ('weaviate', WeaviateConfig(n_dim=128)), ('qdrant', QdrantConfig(n_dim=128)), ('elasticsearch', ElasticConfig(n_dim=128)), + ('opensearch', OpenSearchConfig(n_dim=128)), ('redis', RedisConfig(n_dim=128)), ('milvus', MilvusConfig(n_dim=128)), ], @@ -108,6 +113,7 @@ def test_repr_str(docs, storage, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, QdrantConfig(n_dim=10)), (DocumentArrayElastic, ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, RedisConfig(n_dim=10)), (DocumentArrayMilvus, MilvusConfig(n_dim=10)), ], diff --git a/tests/unit/array/mixins/test_parallel.py b/tests/unit/array/mixins/test_parallel.py index f919e0af70e..9a9d12b6824 100644 --- a/tests/unit/array/mixins/test_parallel.py +++ b/tests/unit/array/mixins/test_parallel.py @@ -5,9 +5,11 @@ import pytest from docarray import DocumentArray, Document +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -54,6 +56,7 @@ def test_parallel_map_apply_external_pool(pytestconfig, pool): (DocumentArrayWeaviate, WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, QdrantConfig(n_dim=10)), (DocumentArrayElastic, ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, RedisConfig(n_dim=10)), (DocumentArrayMilvus, MilvusConfig(n_dim=10)), ], @@ -112,6 +115,7 @@ def test_parallel_map( (DocumentArrayWeaviate, WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, QdrantConfig(n_dim=10)), (DocumentArrayElastic, ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, RedisConfig(n_dim=10)), (DocumentArrayMilvus, MilvusConfig(n_dim=10)), ], @@ -185,6 +189,7 @@ def test_parallel_map_batch( (DocumentArrayWeaviate, WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, QdrantConfig(n_dim=10)), (DocumentArrayElastic, ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, RedisConfig(n_dim=10)), (DocumentArrayMilvus, MilvusConfig(n_dim=10)), ], @@ -215,6 +220,7 @@ def test_map_lambda(pytestconfig, da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=10)), (DocumentArrayQdrant, QdrantConfig(n_dim=10)), (DocumentArrayElastic, ElasticConfig(n_dim=10)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=10)), (DocumentArrayRedis, RedisConfig(n_dim=10)), (DocumentArrayMilvus, MilvusConfig(n_dim=10)), ], @@ -246,6 +252,7 @@ def test_apply_partial(pytestconfig, da_cls, config, start_storage): ('weaviate', WeaviateConfig(n_dim=256)), ('qdrant', QdrantConfig(n_dim=256)), ('elasticsearch', ElasticConfig(n_dim=256)), + ('opensearch', OpenSearchConfig(n_dim=256)), ('redis', RedisConfig(n_dim=256)), ('milvus', MilvusConfig(n_dim=256)), ], diff --git a/tests/unit/array/mixins/test_plot.py b/tests/unit/array/mixins/test_plot.py index 366c33bb0b3..cd734538990 100644 --- a/tests/unit/array/mixins/test_plot.py +++ b/tests/unit/array/mixins/test_plot.py @@ -6,8 +6,10 @@ import pytest from docarray import DocumentArray, Document +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -29,6 +31,7 @@ # (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128, scroll_batch_size=8)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), # (DocumentArrayMilvus, MilvusConfig(n_dim=128)), # tensor is too large to handle ], @@ -69,6 +72,7 @@ def test_sprite_fail_tensor_success_uri( (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=128, scroll_batch_size=8)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=128)), # (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=128)), ], @@ -158,6 +162,7 @@ def _test_plot_embeddings(da): (DocumentArrayWeaviate, lambda: WeaviateConfig(n_dim=5)), (DocumentArrayQdrant, lambda: QdrantConfig(n_dim=5)), (DocumentArrayElastic, lambda: ElasticConfig(n_dim=5)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=5)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=5)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=5)), ], @@ -191,6 +196,7 @@ def test_plot_embeddings_same_path(tmpdir, da_cls, config_gen, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -221,6 +227,7 @@ def test_summary_homo_hetero(da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], diff --git a/tests/unit/array/mixins/test_sample.py b/tests/unit/array/mixins/test_sample.py index fa75064fdb7..d1451340225 100644 --- a/tests/unit/array/mixins/test_sample.py +++ b/tests/unit/array/mixins/test_sample.py @@ -1,9 +1,11 @@ import pytest from docarray import DocumentArray +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -21,6 +23,7 @@ (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -48,6 +51,7 @@ def test_sample(da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -74,6 +78,7 @@ def test_sample_with_seed(da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -101,6 +106,7 @@ def test_shuffle(da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], diff --git a/tests/unit/array/mixins/test_text.py b/tests/unit/array/mixins/test_text.py index 9d5e42ac2b3..33ad4af312f 100644 --- a/tests/unit/array/mixins/test_text.py +++ b/tests/unit/array/mixins/test_text.py @@ -2,9 +2,11 @@ import pytest from docarray import DocumentArray, Document +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.weaviate import DocumentArrayWeaviate @@ -32,6 +34,7 @@ def docs(): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -62,6 +65,7 @@ def test_da_vocabulary(da_cls, config, docs, min_freq, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -92,6 +96,7 @@ def test_da_text_to_tensor_non_max_len(docs, da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -124,6 +129,7 @@ def test_da_text_to_tensor_max_len_3(docs, da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], @@ -156,6 +162,7 @@ def test_da_text_to_tensor_max_len_1(docs, da_cls, config, start_storage): (DocumentArrayWeaviate, WeaviateConfig(n_dim=128)), (DocumentArrayQdrant, QdrantConfig(n_dim=128)), (DocumentArrayElastic, ElasticConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), ], diff --git a/tests/unit/array/storage/opensearch/__init__.py b/tests/unit/array/storage/opensearch/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/unit/array/storage/opensearch/test_add.py b/tests/unit/array/storage/opensearch/test_add.py new file mode 100644 index 00000000000..9c5b1d0ff14 --- /dev/null +++ b/tests/unit/array/storage/opensearch/test_add.py @@ -0,0 +1,149 @@ +from docarray import Document, DocumentArray +import numpy as np + +import pytest + + +@pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_add_ignore_existing_doc_id(start_storage, columns): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + 'index_name': 'test_add_ignore_existing_doc_id', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='r0', embedding=[0, 0, 0]), + Document(id='r1', embedding=[1, 1, 1]), + Document(id='r2', embedding=[2, 2, 2]), + Document(id='r3', embedding=[3, 3, 3]), + Document(id='r4', embedding=[4, 4, 4]), + ] + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='r0', embedding=[0, 0, 0]), + Document(id='r2', embedding=[2, 2, 2]), + Document(id='r4', embedding=[4, 4, 4]), + Document(id='r5', embedding=[2, 2, 2]), + Document(id='r6', embedding=[4, 4, 4]), + ] + ) + + indexed_offset_count = opensearch_doc._client.count( + index=opensearch_doc._index_name_offset2id + )['count'] + + assert len(opensearch_doc) == len(opensearch_doc[:, 'embedding']) + assert len(opensearch_doc) == indexed_offset_count + assert len(opensearch_doc[:, 'embedding']) == 7 + + +@pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_add_skip_wrong_data_type_and_fix_offset(start_storage, columns): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'index_name': 'test_add_skip_wrong_data_type_and_fix_offset', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='0', price=1000), + Document(id='1', price=20000), + Document(id='2', price=103000), + ] + ) + + with pytest.raises(IndexError): + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='0', price=10000), + Document(id='1', price=20000), + Document(id='3', price=30000), + Document(id='4', price=100000000000), # overflow int32 + Document(id='5', price=2000), + Document(id='6', price=100000000000), # overflow int32 + Document(id='7', price=30000), + ] + ) + + expected_ids = ['0', '1', '2', '3', '5', '7'] + + assert len(opensearch_doc) == 6 + assert len(opensearch_doc[:, 'id']) == 6 + assert opensearch_doc[:, 'id'] == expected_ids + assert opensearch_doc._offset2ids.ids == expected_ids + + +@pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.parametrize("assert_customization_propagation", [True, False]) +@pytest.mark.parametrize( + 'columns', + [ + [ + ('is_true', 'bool'), + ('test_long', 'long'), + ('test_double', 'double'), + ], + {'is_true': 'bool', 'test_long': 'long', 'test_double': 'double'}, + ], +) +def test_succes_add_bulk_custom_params( + monkeypatch, start_storage, assert_customization_propagation, columns +): + bulk_custom_params = { + 'thread_count': 4, + 'chunk_size': 100, + 'max_chunk_bytes': 104857600, + 'queue_size': 4, + } + nrof_docs = 100 + + def _mock_send_requests(requests, **kwargs): + # Currently only self._send_requests from extend method which + # receive customization + if ( + not requests[0]['_index'].startswith('offset2id__') + and requests[0]['_op_type'] == 'index' + ): + assert kwargs == bulk_custom_params + + return [{'index': {'_id': f'r{i}'}} for i in range(nrof_docs)] + + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + 'index_name': 'test_succes_add_bulk_custom_params', + }, + ) + + if assert_customization_propagation: + monkeypatch.setattr(opensearch_doc, '_send_requests', _mock_send_requests) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id=f'r{i}', embedding=np.ones((3,)) * i) + for i in range(nrof_docs) + ], + **bulk_custom_params, + ) diff --git a/tests/unit/array/storage/opensearch/test_data_type.py b/tests/unit/array/storage/opensearch/test_data_type.py new file mode 100644 index 00000000000..d29af545ae8 --- /dev/null +++ b/tests/unit/array/storage/opensearch/test_data_type.py @@ -0,0 +1,41 @@ +import pytest +from docarray import DocumentArray, Document + + +@pytest.mark.parametrize( + 'columns', + [ + [ + ('is_true', 'bool'), + ('test_long', 'long'), + ('test_double', 'double'), + ], + {'is_true': 'bool', 'test_long': 'long', 'test_double': 'double'}, + ], +) +def test_data_type(start_storage, columns): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + 'index_name': 'test_data_type', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document( + id=1, + test_bool=True, + test_long=372_036_854_775_807, + test_double=1_000_000_000_000_000, + ) + ] + ) + + assert opensearch_doc[0].tags['test_bool'] is True + assert opensearch_doc[0].tags['test_long'] == 372_036_854_775_807 + assert opensearch_doc[0].tags['test_double'] == 1_000_000_000_000_000 diff --git a/tests/unit/array/storage/opensearch/test_del.py b/tests/unit/array/storage/opensearch/test_del.py new file mode 100644 index 00000000000..b60044b2456 --- /dev/null +++ b/tests/unit/array/storage/opensearch/test_del.py @@ -0,0 +1,94 @@ +import pytest + +from docarray import Document, DocumentArray + + +@pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.parametrize('deleted_elmnts', [[0, 1], ['r0', 'r1']]) +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_delete_offset_success_sync_es_offset_index( + deleted_elmnts, start_storage, columns +): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='r0', embedding=[0, 0, 0]), + Document(id='r1', embedding=[1, 1, 1]), + Document(id='r2', embedding=[2, 2, 2]), + Document(id='r3', embedding=[3, 3, 3]), + Document(id='r4', embedding=[4, 4, 4]), + Document(id='r5', embedding=[5, 5, 5]), + Document(id='r6', embedding=[6, 6, 6]), + Document(id='r7', embedding=[7, 7, 7]), + ] + ) + + expected_offset_after_del = ['r2', 'r3', 'r4', 'r5', 'r6', 'r7'] + + with opensearch_doc: + del opensearch_doc[deleted_elmnts] + + indexed_offset_count = opensearch_doc._client.count( + index=opensearch_doc._index_name_offset2id + )['count'] + + assert len(opensearch_doc._offset2ids.ids) == indexed_offset_count + assert len(opensearch_doc._offset2ids.ids) == 6 + assert len(opensearch_doc[:, 'embedding']) == 6 + + for id in expected_offset_after_del: + expected_offset = str(expected_offset_after_del.index(id)) + actual_offset_index = opensearch_doc._client.search( + index=opensearch_doc._index_name_offset2id, + body={'query': {'match': {'blob': id}}}, + )['hits']['hits'][0]['_id'] + assert actual_offset_index == expected_offset + + +@pytest.mark.filterwarnings('ignore::UserWarning') +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_success_handle_bulk_delete_not_found(start_storage, columns): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + }, + ) + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id='r0', embedding=[0, 0, 0]), + Document(id='r1', embedding=[1, 1, 1]), + ] + ) + + offset_index = opensearch_doc._index_name_offset2id + + expected_to_be_fail_del_data = [ + { + '_op_type': 'delete', + '_id': 0, # offset data exist + '_index': offset_index, + }, + { + '_op_type': 'delete', + '_id': 2, # offset data not exist, expect to fail + '_index': offset_index, + }, + ] + + info = opensearch_doc._send_requests(expected_to_be_fail_del_data) + + assert len(info) == 1 + assert 'delete' in info[0].keys() diff --git a/tests/unit/array/storage/opensearch/test_find.py b/tests/unit/array/storage/opensearch/test_find.py new file mode 100644 index 00000000000..5facfb7ff54 --- /dev/null +++ b/tests/unit/array/storage/opensearch/test_find.py @@ -0,0 +1,107 @@ +from docarray import Document, DocumentArray +import numpy as np + + +def test_success_find_with_added_kwargs(start_storage, monkeypatch): + nrof_docs = 1000 + num_candidates = 100 + + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'distance': 'l2', + 'index_name': 'test_success_find_with_added_kwargs', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id=f'r{i}', embedding=np.ones((3,)) * i) + for i in range(nrof_docs) + ], + ) + + def _mock_knn_search(**kwargs): + assert kwargs['body']['size'] == num_candidates + + return {'hits': {'hits': []}} + + monkeypatch.setattr(opensearch_doc._client, 'search', _mock_knn_search) + + np_query = np.array([2, 1, 3]) + + opensearch_doc.find(np_query, limit=num_candidates) + + +def test_filter(start_storage): + import random + import string + + opensearch_da = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 2, + 'columns': { + 'A': 'str', + 'B': 'str', + 'V': 'str', + 'D': 'str', + 'E': 'str', + 'F': 'str', + 'G': 'str', + }, + }, + ) + + def ran(): + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=10)) + + def ran_size(): + sizes = ['S', 'M', 'L', 'XL'] + return sizes[random.randint(0, len(sizes) - 1)] + + def ran_type(): + types = ['A', 'B', 'C', 'D'] + return types[random.randint(0, len(types) - 1)] + + def ran_stype(): + stypes = ['SA', 'SB', 'SC', 'SD'] + return stypes[random.randint(0, len(stypes) - 1)] + + docs = DocumentArray( + [ + Document( + id=f'r{i}', + embedding=np.random.rand(2), + tags={ + 'A': ran(), + 'B': ran_stype(), + 'C': ran_size(), + 'D': ran_type(), + 'E': ran(), + 'F': ran_type(), + 'G': f'G{i}', + }, + ) + for i in range(50) + ] + ) + + with opensearch_da: + opensearch_da.extend(docs) + res = opensearch_da.find(query=Document(embedding=docs[0].embedding)) + assert len(res) > 0 + assert res[0][0].tags['G'] == 'G0' + filter_ = {'match': {'G': 'G3'}} + + res = opensearch_da.find(filter=filter_) + assert len(res) > 0 + assert res[0].tags['G'] == 'G3' + + res = opensearch_da.find( + query=Document(embedding=docs[0].embedding), filter=filter_ + ) + assert len(res) > 0 + assert res[0][0].tags['G'] == 'G3' diff --git a/tests/unit/array/storage/opensearch/test_get.py b/tests/unit/array/storage/opensearch/test_get.py new file mode 100644 index 00000000000..be505aa7531 --- /dev/null +++ b/tests/unit/array/storage/opensearch/test_get.py @@ -0,0 +1,55 @@ +import numpy as np +import pytest + +from docarray import Document, DocumentArray + + +@pytest.mark.parametrize('nrof_docs', [10, 100, 10_000, 10_100, 20_000, 20_100]) +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_success_get_bulk_data(start_storage, nrof_docs, columns): + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id=f'r{i}', embedding=np.ones((3,)) * i) + for i in range(nrof_docs) + ] + ) + + assert len(opensearch_doc[:, 'id']) == nrof_docs + + +@pytest.mark.parametrize('columns', [[('price', 'int')], {'price': 'int'}]) +def test_error_get_bulk_data_id_not_exist(start_storage, columns): + nrof_docs = 10 + + opensearch_doc = DocumentArray( + storage='opensearch', + config={ + 'n_dim': 3, + 'columns': columns, + 'distance': 'l2', + }, + ) + + with opensearch_doc: + opensearch_doc.extend( + [ + Document(id=f'r{i}', embedding=np.ones((3,)) * i) + for i in range(nrof_docs) + ] + ) + + with pytest.raises(KeyError) as e: + opensearch_doc[['r1', 'r11', 'r21'], 'id'] + + assert e.value.args[0] == ['r11', 'r21'] + assert len(e.value.args[1]) == 1 diff --git a/tests/unit/array/test_advance_indexing.py b/tests/unit/array/test_advance_indexing.py index 088aa5f3bef..712157be4c1 100644 --- a/tests/unit/array/test_advance_indexing.py +++ b/tests/unit/array/test_advance_indexing.py @@ -2,6 +2,7 @@ import pytest from docarray import DocumentArray, Document +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.weaviate import WeaviateConfig from docarray.array.annlite import AnnliteConfig, DocumentArrayAnnlite from docarray.array.qdrant import QdrantConfig @@ -21,17 +22,18 @@ def indices(): @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_getter_int_str(docs, storage, config, start_storage): @@ -40,33 +42,34 @@ def test_getter_int_str(docs, storage, config, start_storage): else: docs = DocumentArray(docs, storage=storage) # getter - assert docs[99].text == '99' - assert docs[np.int(99)].text == '99' - assert docs[-1].text == '99' - assert docs[0].text == '0' + assert docs[99].text == "99" + assert docs[np.int(99)].text == "99" + assert docs[-1].text == "99" + assert docs[0].text == "0" # string index - assert docs[docs[0].id].text == '0' - assert docs[docs[99].id].text == '99' - assert docs[docs[-1].id].text == '99' + assert docs[docs[0].id].text == "0" + assert docs[docs[99].id].text == "99" + assert docs[docs[-1].id].text == "99" with pytest.raises(IndexError): docs[100] with pytest.raises(KeyError): - docs['adsad'] + docs["adsad"] @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_setter_int_str(docs, storage, config, start_storage): @@ -75,30 +78,31 @@ def test_setter_int_str(docs, storage, config, start_storage): else: docs = DocumentArray(docs, storage=storage) # setter - docs[99] = Document(text='hello') - docs[0] = Document(text='world') + docs[99] = Document(text="hello") + docs[0] = Document(text="world") - assert docs[99].text == 'hello' - assert docs[-1].text == 'hello' - assert docs[0].text == 'world' + assert docs[99].text == "hello" + assert docs[-1].text == "hello" + assert docs[0].text == "world" - docs[docs[2].id] = Document(text='doc2') + docs[docs[2].id] = Document(text="doc2") # string index - assert docs[docs[2].id].text == 'doc2' + assert docs[docs[2].id].text == "doc2" @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_del_int_str(docs, storage, config, start_storage, indices): @@ -125,17 +129,18 @@ def test_del_int_str(docs, storage, config, start_storage, indices): @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_slice(docs, storage, config, start_storage): @@ -148,12 +153,12 @@ def test_slice(docs, storage, config, start_storage): assert len(docs[1:100:5]) == 20 # 1 to 100, sep with 5 # setter - with pytest.raises(TypeError, match='an iterable'): - docs[1:5] = Document(text='repl') + with pytest.raises(TypeError, match="an iterable"): + docs[1:5] = Document(text="repl") - docs[1:5] = [Document(text=f'repl{j}') for j in range(4)] + docs[1:5] = [Document(text=f"repl{j}") for j in range(4)] for d in docs[1:5]: - assert d.text.startswith('repl') + assert d.text.startswith("repl") assert len(docs) == 100 # del @@ -166,17 +171,18 @@ def test_slice(docs, storage, config, start_storage): @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_sequence_bool_index(docs, storage, config, start_storage): @@ -190,21 +196,21 @@ def test_sequence_bool_index(docs, storage, config, start_storage): # setter mask = [True, False] * 50 - docs[mask, 'text'] = [f'repl{j}' for j in range(50)] + docs[mask, "text"] = [f"repl{j}" for j in range(50)] for idx, d in enumerate(docs): if idx % 2 == 0: # got replaced - assert d.text.startswith('repl') + assert d.text.startswith("repl") else: assert d.text == str(idx) - docs[mask] = [Document(text='test') for _ in range(50)] + docs[mask] = [Document(text="test") for _ in range(50)] for idx, d in enumerate(docs): if idx % 2 == 0: # got replaced - assert d.text == 'test' + assert d.text == "test" else: assert d.text == str(idx) @@ -213,19 +219,20 @@ def test_sequence_bool_index(docs, storage, config, start_storage): assert len(docs) == 50 -@pytest.mark.parametrize('nparray', [lambda x: x, np.array, tuple]) +@pytest.mark.parametrize("nparray", [lambda x: x, np.array, tuple]) @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_sequence_int(docs, nparray, storage, config, start_storage): @@ -238,33 +245,34 @@ def test_sequence_int(docs, nparray, storage, config, start_storage): assert len(docs[idx]) == len(idx) # setter - docs[idx] = [Document(text='repl') for _ in range(len(idx))] + docs[idx] = [Document(text="repl") for _ in range(len(idx))] for _id in idx: - assert docs[_id].text == 'repl' + assert docs[_id].text == "repl" # del idx = [-3, -4, -5, 9, 10, 11] del docs[idx] assert len(docs) == 100 - len(idx) - docs[1, 5, 9] = [Document(text='new') for _ in range(3)] - assert docs[1].text == 'new' - assert docs[5].text == 'new' - assert docs[9].text == 'new' + docs[1, 5, 9] = [Document(text="new") for _ in range(3)] + assert docs[1].text == "new" + assert docs[5].text == "new" + assert docs[9].text == "new" @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_sequence_str(docs, storage, config, start_storage): @@ -280,10 +288,10 @@ def test_sequence_str(docs, storage, config, start_storage): # setter idx = [d.id for d in docs[1, 3, 5, 7, -1, -2]] - docs[idx] = [Document(text='repl') for _ in range(len(idx))] + docs[idx] = [Document(text="repl") for _ in range(len(idx))] idx = [d.id for d in docs[1, 3, 5, 7, -1, -2]] for _id in idx: - assert docs[_id].text == 'repl' + assert docs[_id].text == "repl" # del idx = [d.id for d in docs[-3, -4, -5, 9, 10, 11]] @@ -292,17 +300,18 @@ def test_sequence_str(docs, storage, config, start_storage): @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_docarray_list_tuple(docs, storage, config, start_storage): @@ -315,17 +324,18 @@ def test_docarray_list_tuple(docs, storage, config, start_storage): @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) def test_path_syntax_indexing(storage, config, start_storage): @@ -336,159 +346,174 @@ def test_path_syntax_indexing(storage, config, start_storage): for c in d.chunks: c.chunks = DocumentArray.empty(3) - if not storage == 'memory': + if not storage == "memory": if config: da = DocumentArray(da, storage=storage, config=config) else: da = DocumentArray(da, storage=storage) with da: - assert len(da['@c']) == 3 * 5 - assert len(da['@c:1']) == 3 - assert len(da['@c-1:']) == 3 - assert len(da['@c1']) == 3 - assert len(da['@c-2:']) == 3 * 2 - assert len(da['@c1:3']) == 3 * 2 - assert len(da['@c1:3c']) == (3 * 2) * 3 - assert len(da['@c1:3,c1:3c']) == (3 * 2) + (3 * 2) * 3 - assert len(da['@c 1:3 , c 1:3 c']) == (3 * 2) + (3 * 2) * 3 - assert len(da['@cc']) == 3 * 5 * 3 - assert len(da['@cc,m']) == 3 * 5 * 3 + 3 * 7 - assert len(da['@r:1cc,m']) == 1 * 5 * 3 + 3 * 7 + assert len(da["@c"]) == 3 * 5 + assert len(da["@c:1"]) == 3 + assert len(da["@c-1:"]) == 3 + assert len(da["@c1"]) == 3 + assert len(da["@c-2:"]) == 3 * 2 + assert len(da["@c1:3"]) == 3 * 2 + assert len(da["@c1:3c"]) == (3 * 2) * 3 + assert len(da["@c1:3,c1:3c"]) == (3 * 2) + (3 * 2) * 3 + assert len(da["@c 1:3 , c 1:3 c"]) == (3 * 2) + (3 * 2) * 3 + assert len(da["@cc"]) == 3 * 5 * 3 + assert len(da["@cc,m"]) == 3 * 5 * 3 + 3 * 7 + assert len(da["@r:1cc,m"]) == 1 * 5 * 3 + 3 * 7 @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), ], ) -@pytest.mark.parametrize('use_subindex', [False, True]) +@pytest.mark.parametrize("use_subindex", [False, True]) def test_path_syntax_indexing_set(storage, config, use_subindex, start_storage): da = DocumentArray.empty(3) for i, d in enumerate(da): d.chunks = DocumentArray.empty(5) - d.matches = DocumentArray([Document(id=f'm{j + (i * 7)}') for j in range(7)]) + d.matches = DocumentArray([Document(id=f"m{j + (i * 7)}") for j in range(7)]) for c in d.chunks: c.chunks = DocumentArray.empty(3) repeat = lambda s, l: [s] * l - da['@r,c,m,cc', 'text'] = repeat('a', 3 + 5 * 3 + 7 * 3 + 3 * 5 * 3) + da["@r,c,m,cc", "text"] = repeat("a", 3 + 5 * 3 + 7 * 3 + 3 * 5 * 3) if config: da = DocumentArray( da, storage=storage, config=config, - subindex_configs={'@c': {'n_dim': 123}} if use_subindex else None, + subindex_configs={"@c": {"n_dim": 123}} if use_subindex else None, ) else: da = DocumentArray( - da, storage=storage, subindex_configs={'@c': None} if use_subindex else None + da, storage=storage, subindex_configs={"@c": None} if use_subindex else None ) with da: - assert da['@c'].texts == repeat('a', 3 * 5) - assert da['@c', 'text'] == repeat('a', 3 * 5) + assert da["@c"].texts == repeat("a", 3 * 5) + assert da["@c", "text"] == repeat("a", 3 * 5) if use_subindex: - assert da._subindices['@c'].texts == repeat('a', 3 * 5) - assert da['@c:1', 'text'] == repeat('a', 3) - assert da['@c-1:', 'text'] == repeat('a', 3) - assert da['@c1', 'text'] == repeat('a', 3) - assert da['@c-2:', 'text'] == repeat('a', 3 * 2) - assert da['@c1:3', 'text'] == repeat('a', 3 * 2) - assert da['@c1:3c', 'text'] == repeat('a', (3 * 2) * 3) - assert da['@c1:3,c1:3c', 'text'] == repeat('a', (3 * 2) + (3 * 2) * 3) - assert da['@c 1:3 , c 1:3 c', 'text'] == repeat('a', (3 * 2) + (3 * 2) * 3) - assert da['@cc', 'text'] == repeat('a', 3 * 5 * 3) - assert da['@cc,m', 'text'] == repeat('a', 3 * 5 * 3 + 3 * 7) - assert da['@r:1cc,m', 'text'] == repeat('a', 1 * 5 * 3 + 3 * 7) - assert da[0, 'text'] == 'a' - assert da[[True for _ in da], 'text'] == repeat('a', 3) - - da['@m,cc', 'text'] = repeat('b', 3 + 5 * 3 + 7 * 3 + 3 * 5 * 3) + assert da._subindices["@c"].texts == repeat("a", 3 * 5) + assert da["@c:1", "text"] == repeat("a", 3) + assert da["@c-1:", "text"] == repeat("a", 3) + assert da["@c1", "text"] == repeat("a", 3) + assert da["@c-2:", "text"] == repeat("a", 3 * 2) + assert da["@c1:3", "text"] == repeat("a", 3 * 2) + assert da["@c1:3c", "text"] == repeat("a", (3 * 2) * 3) + assert da["@c1:3,c1:3c", "text"] == repeat("a", (3 * 2) + (3 * 2) * 3) + assert da["@c 1:3 , c 1:3 c", "text"] == repeat("a", (3 * 2) + (3 * 2) * 3) + assert da["@cc", "text"] == repeat("a", 3 * 5 * 3) + assert da["@cc,m", "text"] == repeat("a", 3 * 5 * 3 + 3 * 7) + assert da["@r:1cc,m", "text"] == repeat("a", 1 * 5 * 3 + 3 * 7) + assert da[0, "text"] == "a" + assert da[[True for _ in da], "text"] == repeat("a", 3) + + da["@m,cc", "text"] = repeat("b", 3 + 5 * 3 + 7 * 3 + 3 * 5 * 3) with da: - assert da['@c', 'text'] == repeat('a', 3 * 5) + assert da["@c", "text"] == repeat("a", 3 * 5) if use_subindex: - assert da._subindices['@c'].texts == repeat('a', 3 * 5) - assert da['@c:1', 'text'] == repeat('a', 3) - assert da['@c-1:', 'text'] == repeat('a', 3) - assert da['@c1', 'text'] == repeat('a', 3) - assert da['@c-2:', 'text'] == repeat('a', 3 * 2) - assert da['@c1:3', 'text'] == repeat('a', 3 * 2) - assert da['@c1:3c', 'text'] == repeat('b', (3 * 2) * 3) - assert da['@c1:3,c1:3c', 'text'] == repeat('a', (3 * 2)) + repeat( - 'b', (3 * 2) * 3 + assert da._subindices["@c"].texts == repeat("a", 3 * 5) + assert da["@c:1", "text"] == repeat("a", 3) + assert da["@c-1:", "text"] == repeat("a", 3) + assert da["@c1", "text"] == repeat("a", 3) + assert da["@c-2:", "text"] == repeat("a", 3 * 2) + assert da["@c1:3", "text"] == repeat("a", 3 * 2) + assert da["@c1:3c", "text"] == repeat("b", (3 * 2) * 3) + assert da["@c1:3,c1:3c", "text"] == repeat("a", (3 * 2)) + repeat( + "b", (3 * 2) * 3 ) - assert da['@c 1:3 , c 1:3 c', 'text'] == repeat('a', (3 * 2)) + repeat( - 'b', (3 * 2) * 3 + assert da["@c 1:3 , c 1:3 c", "text"] == repeat("a", (3 * 2)) + repeat( + "b", (3 * 2) * 3 ) - assert da['@cc', 'text'] == repeat('b', 3 * 5 * 3) - assert da['@cc,m', 'text'] == repeat('b', 3 * 5 * 3 + 3 * 7) - assert da['@r:1cc,m', 'text'] == repeat('b', 1 * 5 * 3 + 3 * 7) - assert da[0, 'text'] == 'a' - assert da[[True for _ in da], 'text'] == repeat('a', 3) + assert da["@cc", "text"] == repeat("b", 3 * 5 * 3) + assert da["@cc,m", "text"] == repeat("b", 3 * 5 * 3 + 3 * 7) + assert da["@r:1cc,m", "text"] == repeat("b", 1 * 5 * 3 + 3 * 7) + assert da[0, "text"] == "a" + assert da[[True for _ in da], "text"] == repeat("a", 3) - da[1, 'text'] = 'd' - assert da[1, 'text'] == 'd' - assert da[1].text == 'd' + da[1, "text"] = "d" + assert da[1, "text"] == "d" + assert da[1].text == "d" doc_id = da[0].id - da[doc_id, 'text'] = 'e' - assert da[doc_id, 'text'] == 'e' - assert da[doc_id].text == 'e' + da[doc_id, "text"] = "e" + assert da[doc_id, "text"] == "e" + assert da[doc_id].text == "e" # setting matches is only possible if the IDs are the same with da: - da['@m'] = [Document(id=f'm{i}', text='c') for i in range(3 * 7)] - assert da['@m', 'text'] == repeat('c', 3 * 7) + da["@m"] = [Document(id=f"m{i}", text="c") for i in range(3 * 7)] + assert da["@m", "text"] == repeat("c", 3 * 7) # setting by traversal paths with different IDs is not supported with pytest.raises(ValueError): - da['@m'] = [Document() for _ in range(3 * 7)] + da["@m"] = [Document() for _ in range(3 * 7)] - da[2, ['text', 'id']] = ['new_text', 'new_id'] - assert da[2].text == 'new_text' - assert da[2].id == 'new_id' + da[2, ["text", "id"]] = ["new_text", "new_id"] + assert da[2].text == "new_text" + assert da[2].id == "new_id" -def test_getset_subindex(): +@pytest.mark.parametrize( + "storage,config", + [ + ("memory", None), + ("sqlite", None), + ("weaviate", WeaviateConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), + ], +) +def test_getset_subindex(storage, config, start_storage): da = DocumentArray( [Document(chunks=[Document() for _ in range(5)]) for _ in range(3)], - subindex_configs={'@c': None}, + subindex_configs={"@c": None}, ) with da: - assert len(da['@c']) == 15 - assert len(da._subindices['@c']) == 15 + assert len(da["@c"]) == 15 + assert len(da._subindices["@c"]) == 15 # set entire subindex - chunks_ids = [c.id for c in da['@c']] - new_chunks = [Document(id=cid, text=f'{i}') for i, cid in enumerate(chunks_ids)] - da['@c'] = new_chunks + chunks_ids = [c.id for c in da["@c"]] + new_chunks = [Document(id=cid, text=f"{i}") for i, cid in enumerate(chunks_ids)] + da["@c"] = new_chunks new_chunks = DocumentArray(new_chunks) - assert da['@c'] == new_chunks - assert da._subindices['@c'] == new_chunks + assert da["@c"] == new_chunks + assert da._subindices["@c"] == new_chunks collected_chunks = DocumentArray.empty(0) for d in da: collected_chunks.extend(d.chunks) assert collected_chunks == new_chunks # set part of a subindex - chunks_ids = [c.id for c in da['@c:3']] + chunks_ids = [c.id for c in da["@c:3"]] new_chunks = [ - Document(id=cid, text=f'{2*i}') for i, cid in enumerate(chunks_ids) + Document(id=cid, text=f"{2*i}") for i, cid in enumerate(chunks_ids) ] - da['@c:3'] = new_chunks + da["@c:3"] = new_chunks new_chunks = DocumentArray(new_chunks) - assert da['@c:3'] == new_chunks + assert da["@c:3"] == new_chunks for d in new_chunks: - assert d in da._subindices['@c'] + assert d in da._subindices["@c"] collected_chunks = DocumentArray.empty(0) for d in da: collected_chunks.extend(d.chunks[:3]) @@ -496,17 +521,18 @@ def test_getset_subindex(): @pytest.mark.parametrize( - 'storage,config,subindex_config', + "storage,config,subindex_config", [ - ('memory', None, None), - ('sqlite', None, None), - ('weaviate', WeaviateConfig(n_dim=123), {'n_dim': 123}), - ('annlite', AnnliteConfig(n_dim=123), {'n_dim': 123}), - ('qdrant', QdrantConfig(n_dim=123), {'n_dim': 123}), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True), {'n_dim': 123}), - ('elasticsearch', ElasticConfig(n_dim=123), {'n_dim': 123}), - ('redis', RedisConfig(n_dim=123), {'n_dim': 123}), - ('milvus', MilvusConfig(n_dim=123), {'n_dim': 123}), + ("memory", None, None), + ("sqlite", None, None), + ("weaviate", WeaviateConfig(n_dim=123), None), + ("annlite", AnnliteConfig(n_dim=123), None), + ("qdrant", QdrantConfig(n_dim=123), None), + ("qdrant", QdrantConfig(n_dim=123, prefer_grpc=True), None), + ("elasticsearch", ElasticConfig(n_dim=123), None), + ("redis", RedisConfig(n_dim=123), None), + ("opensearch", OpenSearchConfig(n_dim=123), None), + ("milvus", MilvusConfig(n_dim=123), {"n_dim": 123}), ], ) def test_getset_subindex_in_store(storage, config, subindex_config, start_storage): @@ -514,36 +540,37 @@ def test_getset_subindex_in_store(storage, config, subindex_config, start_storag [Document(chunks=[Document() for _ in range(5)]) for _ in range(3)], storage=storage, config=config, - subindex_configs={'@c': subindex_config}, + subindex_configs={"@c": subindex_config}, ) with da: - assert len(da['@c']) == 15 - assert len(da._subindices['@c']) == 15 + assert len(da["@c"]) == 15 + assert len(da._subindices["@c"]) == 15 - chunks_ids = [c.id for c in da['@c']] + chunks_ids = [c.id for c in da["@c"]] new_chunks = [ Document(id=cid, embedding=np.ones(123) * i) for i, cid in enumerate(chunks_ids) ] - da['@c'] = new_chunks + da["@c"] = new_chunks - res = da.find(np.random.random(123), on='@c') + res = da.find(np.random.random(123), on="@c") assert len(res) > 0 -@pytest.mark.parametrize('size', [1, 5]) +@pytest.mark.parametrize("size", [1, 5]) @pytest.mark.parametrize( - 'storage,config_gen', + "storage,config_gen", [ - ('memory', None), - ('sqlite', None), - ('weaviate', lambda: WeaviateConfig(n_dim=123)), - ('annlite', lambda: AnnliteConfig(n_dim=123)), - ('qdrant', lambda: QdrantConfig(n_dim=123)), - ('qdrant', lambda: QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', lambda: ElasticConfig(n_dim=123)), - ('redis', lambda: RedisConfig(n_dim=123)), - ('milvus', lambda: MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", lambda: WeaviateConfig(n_dim=123)), + ("annlite", lambda: AnnliteConfig(n_dim=123)), + ("qdrant", lambda: QdrantConfig(n_dim=123)), + ("qdrant", lambda: QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", lambda: ElasticConfig(n_dim=123)), + ("redis", lambda: RedisConfig(n_dim=123)), + ("milvus", lambda: MilvusConfig(n_dim=123)), + ("opensearch", lambda: OpenSearchConfig(n_dim=123)), ], ) def test_attribute_indexing(storage, config_gen, start_storage, size): @@ -553,38 +580,39 @@ def test_attribute_indexing(storage, config_gen, start_storage, size): da = DocumentArray(storage=storage) da.extend(DocumentArray.empty(size)) - for v in da[:, 'id']: + for v in da[:, "id"]: assert v - da[:, 'mime_type'] = [f'type {j}' for j in range(size)] - for v in da[:, 'mime_type']: + da[:, "mime_type"] = [f"type {j}" for j in range(size)] + for v in da[:, "mime_type"]: assert v - del da[:, 'mime_type'] - for v in da[:, 'mime_type']: + del da[:, "mime_type"] + for v in da[:, "mime_type"]: assert not v - da[:, ['text', 'mime_type']] = [ - [f'hello {j}' for j in range(size)], - [f'type {j}' for j in range(size)], + da[:, ["text", "mime_type"]] = [ + [f"hello {j}" for j in range(size)], + [f"type {j}" for j in range(size)], ] da.summary() - for v in da[:, ['mime_type', 'text']]: + for v in da[:, ["mime_type", "text"]]: for vv in v: assert vv @pytest.mark.parametrize( - 'storage,config_gen', + "storage,config_gen", [ - ('memory', None), - ('sqlite', None), - ('weaviate', lambda: WeaviateConfig(n_dim=10)), - ('annlite', lambda: AnnliteConfig(n_dim=10)), - ('qdrant', lambda: QdrantConfig(n_dim=10)), - ('qdrant', lambda: QdrantConfig(n_dim=10, prefer_grpc=True)), - ('elasticsearch', lambda: ElasticConfig(n_dim=10)), - ('redis', lambda: RedisConfig(n_dim=10)), - ('milvus', lambda: MilvusConfig(n_dim=10)), + ("memory", None), + ("sqlite", None), + ("weaviate", lambda: WeaviateConfig(n_dim=10)), + ("annlite", lambda: AnnliteConfig(n_dim=10)), + ("qdrant", lambda: QdrantConfig(n_dim=10)), + ("qdrant", lambda: QdrantConfig(n_dim=10, prefer_grpc=True)), + ("elasticsearch", lambda: ElasticConfig(n_dim=10)), + ("redis", lambda: RedisConfig(n_dim=10)), + ("milvus", lambda: MilvusConfig(n_dim=10)), + ("opensearch", lambda: OpenSearchConfig(n_dim=10)), ], ) def test_tensor_attribute_selector(storage, config_gen, start_storage): @@ -601,18 +629,18 @@ def test_tensor_attribute_selector(storage, config_gen, start_storage): da.extend(DocumentArray.empty(3)) - da[:, 'embedding'] = sp_embed + da[:, "embedding"] = sp_embed - assert da[:, 'embedding'].shape == (3, 10) + assert da[:, "embedding"].shape == (3, 10) for d in da: assert d.embedding.shape == (1, 10) - v1, v2 = da[:, ['embedding', 'id']] + v1, v2 = da[:, ["embedding", "id"]] assert isinstance(v1, scipy.sparse.coo_matrix) assert isinstance(v2, list) - v1, v2 = da[:, ['id', 'embedding']] + v1, v2 = da[:, ["id", "embedding"]] assert isinstance(v2, scipy.sparse.coo_matrix) assert isinstance(v1, list) @@ -620,10 +648,10 @@ def test_tensor_attribute_selector(storage, config_gen, start_storage): # TODO: since match function is not implemented, this test will # not work with weaviate storage atm, will be addressed in # next version -@pytest.mark.parametrize('storage', ['memory', 'sqlite', 'annlite']) +@pytest.mark.parametrize("storage", ["memory", "sqlite", "annlite"]) def test_advance_selector_mixed(storage): - if storage == 'annlite': - da = DocumentArray(storage=storage, config={'n_dim': 3}) + if storage == "annlite": + da = DocumentArray(storage=storage, config={"n_dim": 3}) else: da = DocumentArray(storage=storage) @@ -632,22 +660,23 @@ def test_advance_selector_mixed(storage): da.match(da, exclude_self=True) - assert len(da[:, ('id', 'embedding', 'matches')]) == 3 - assert len(da[:, ('id', 'embedding', 'matches')][0]) == 10 + assert len(da[:, ("id", "embedding", "matches")]) == 3 + assert len(da[:, ("id", "embedding", "matches")][0]) == 10 @pytest.mark.parametrize( - 'storage,config_gen', + "storage,config_gen", [ - ('memory', None), - ('sqlite', None), - ('weaviate', lambda: WeaviateConfig(n_dim=10)), - ('annlite', lambda: AnnliteConfig(n_dim=10)), - ('qdrant', lambda: QdrantConfig(n_dim=10)), - ('qdrant', lambda: QdrantConfig(n_dim=10, prefer_grpc=True)), - ('elasticsearch', lambda: ElasticConfig(n_dim=10)), - ('redis', lambda: RedisConfig(n_dim=10)), - ('milvus', lambda: MilvusConfig(n_dim=10)), + ("memory", None), + ("sqlite", None), + ("weaviate", lambda: WeaviateConfig(n_dim=10)), + ("annlite", lambda: AnnliteConfig(n_dim=10)), + ("qdrant", lambda: QdrantConfig(n_dim=10)), + ("qdrant", lambda: QdrantConfig(n_dim=10, prefer_grpc=True)), + ("elasticsearch", lambda: ElasticConfig(n_dim=10)), + ("redis", lambda: RedisConfig(n_dim=10)), + ("milvus", lambda: MilvusConfig(n_dim=10)), + ("opensearch", lambda: OpenSearchConfig(n_dim=10)), ], ) def test_single_boolean_and_padding(storage, config_gen, start_storage): @@ -672,17 +701,18 @@ def test_single_boolean_and_padding(storage, config_gen, start_storage): @pytest.mark.parametrize( - 'storage,config_gen', + "storage,config_gen", [ - ('memory', None), - ('sqlite', None), - ('weaviate', lambda: WeaviateConfig(n_dim=123)), - ('annlite', lambda: AnnliteConfig(n_dim=123)), - ('qdrant', lambda: QdrantConfig(n_dim=123)), - ('qdrant', lambda: QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', lambda: ElasticConfig(n_dim=123)), - ('redis', lambda: RedisConfig(n_dim=123)), - ('milvus', lambda: MilvusConfig(n_dim=123)), + ("memory", None), + ("sqlite", None), + ("weaviate", lambda: WeaviateConfig(n_dim=123)), + ("annlite", lambda: AnnliteConfig(n_dim=123)), + ("qdrant", lambda: QdrantConfig(n_dim=123)), + ("qdrant", lambda: QdrantConfig(n_dim=123, prefer_grpc=True)), + ("elasticsearch", lambda: ElasticConfig(n_dim=123)), + ("redis", lambda: RedisConfig(n_dim=123)), + ("milvus", lambda: MilvusConfig(n_dim=123)), + ("opensearch", lambda: OpenSearchConfig(n_dim=123)), ], ) def test_edge_case_two_strings(storage, config_gen, start_storage): @@ -691,75 +721,76 @@ def test_edge_case_two_strings(storage, config_gen, start_storage): da = DocumentArray(storage=storage, config=config_gen()) else: da = DocumentArray(storage=storage) - da.extend([Document(id='1'), Document(id='2'), Document(id='3')]) - assert da['1', 'id'] == '1' - assert len(da['1', '2']) == 2 - assert isinstance(da['1', '2'], DocumentArray) + da.extend([Document(id="1"), Document(id="2"), Document(id="3")]) + assert da["1", "id"] == "1" + assert len(da["1", "2"]) == 2 + assert isinstance(da["1", "2"], DocumentArray) with pytest.raises(KeyError): - da['hello', '2'] + da["hello", "2"] with pytest.raises(AttributeError): - da['1', 'hello'] - assert len(da['1', '2', '3']) == 3 - assert isinstance(da['1', '2', '3'], DocumentArray) + da["1", "hello"] + assert len(da["1", "2", "3"]) == 3 + assert isinstance(da["1", "2", "3"], DocumentArray) # delitem - del da['1', '2'] + del da["1", "2"] assert len(da) == 1 if config_gen: da = DocumentArray(storage=storage, config=config_gen()) else: da = DocumentArray(storage=storage) - da.extend([Document(id=str(i), text='hey') for i in range(3)]) - del da['1', 'text'] + da.extend([Document(id=str(i), text="hey") for i in range(3)]) + del da["1", "text"] assert len(da) == 3 assert not da[1].text with pytest.raises( ValueError, - match='setting the ID of a Document stored in a DocumentArray to None is not allowed', + match="setting the ID of a Document stored in a DocumentArray to None is not allowed", ): - del da['1', 'id'] + del da["1", "id"] - del da['2', 'hello'] + del da["2", "hello"] # setitem if config_gen: da = DocumentArray(storage=storage, config=config_gen()) else: da = DocumentArray(storage=storage) - da.extend([Document(id='1'), Document(id='2'), Document(id='3')]) - da['1', '2'] = DocumentArray.empty(2) - assert da[0].id != '1' - assert da[1].id != '2' + da.extend([Document(id="1"), Document(id="2"), Document(id="3")]) + da["1", "2"] = DocumentArray.empty(2) + assert da[0].id != "1" + assert da[1].id != "2" if config_gen: da = DocumentArray( - [Document(id='1'), Document(id='2'), Document(id='3')], + [Document(id="1"), Document(id="2"), Document(id="3")], storage=storage, config=config_gen(), ) else: da = DocumentArray( - [Document(id='1'), Document(id='2'), Document(id='3')], storage=storage + [Document(id="1"), Document(id="2"), Document(id="3")], storage=storage ) - da['1', 'text'] = 'hello' - assert da['1', 'text'] == 'hello' - assert da['1'].text == 'hello' + da["1", "text"] = "hello" + assert da["1", "text"] == "hello" + assert da["1"].text == "hello" with pytest.raises(IndexError): - da['1', 'hellohello'] = 'hello' + da["1", "hellohello"] = "hello" @pytest.mark.parametrize( - 'storage,config', + "storage,config", [ - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), + ("annlite", AnnliteConfig(n_dim=123)), + ("qdrant", QdrantConfig(n_dim=123)), + ("elasticsearch", ElasticConfig(n_dim=123)), + ("opensearch", OpenSearchConfig(n_dim=123)), + ("redis", RedisConfig(n_dim=123)), + ("milvus", MilvusConfig(n_dim=123)), ], ) def test_offset2ids_persistence(storage, config): @@ -768,16 +799,16 @@ def test_offset2ids_persistence(storage, config): with da: da.extend( [ - Document(id='0'), - Document(id='2'), - Document(id='4'), + Document(id="0"), + Document(id="2"), + Document(id="4"), ] ) - da.insert(1, Document(id='1')) - da.insert(3, Document(id='3')) + da.insert(1, Document(id="1")) + da.insert(3, Document(id="3")) config = da._config - da_ids = da[:, 'id'] + da_ids = da[:, "id"] if isinstance(da, DocumentArrayAnnlite): da._annlite.close() @@ -786,28 +817,28 @@ def test_offset2ids_persistence(storage, config): da1 = DocumentArray(storage=storage, config=config) - assert da1[:, 'id'] == da_ids + assert da1[:, "id"] == da_ids with da1: - da1.extend([Document(id=i) for i in 'abc']) - da1_ids = da1[:, 'id'] + da1.extend([Document(id=i) for i in "abc"]) + da1_ids = da1[:, "id"] assert len(da1) == 8 if isinstance(da, DocumentArrayAnnlite): da1._annlite.close() da2 = DocumentArray(storage=storage, config=config) - assert da2[:, 'id'] == da1_ids + assert da2[:, "id"] == da1_ids def test_dam_conflicting_ids(): docs = [ - Document(id='1'), - Document(id='2'), - Document(id='3'), + Document(id="1"), + Document(id="2"), + Document(id="3"), ] - d = Document(id='1') + d = Document(id="1") da = DocumentArray() da.extend(docs) da.append(d) @@ -816,8 +847,8 @@ def test_dam_conflicting_ids(): assert id(da[0]) == id(docs[0]) assert id(da[3]) == id(d) - da[0].text = 'd1' - da[3].text = 'd2' + da[0].text = "d1" + da[3].text = "d2" - assert docs[0].text == 'd1' - assert d.text == 'd2' + assert docs[0].text == "d1" + assert d.text == "d2" diff --git a/tests/unit/array/test_construct.py b/tests/unit/array/test_construct.py index e0e68d4b834..4af558d9ad0 100644 --- a/tests/unit/array/test_construct.py +++ b/tests/unit/array/test_construct.py @@ -3,9 +3,11 @@ from docarray import Document from docarray.array.memory import DocumentArrayInMemory from docarray.array.elastic import DocumentArrayElastic, ElasticConfig +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.sqlite import DocumentArraySqlite from docarray.array.annlite import DocumentArrayAnnlite, AnnliteConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.weaviate import DocumentArrayWeaviate, WeaviateConfig from docarray.array.elastic import DocumentArrayElastic, ElasticConfig @@ -24,6 +26,7 @@ (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) def test_construct_docarray(da_cls, config, start_storage): @@ -74,6 +77,7 @@ def test_construct_docarray(da_cls, config, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) @pytest.mark.parametrize('is_copy', [True, False]) @@ -105,6 +109,7 @@ def test_docarray_copy_singleton(da_cls, config, is_copy, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) @pytest.mark.parametrize('is_copy', [True, False]) @@ -135,6 +140,7 @@ def test_docarray_copy_da(da_cls, config, is_copy, start_storage): (DocumentArrayElastic, ElasticConfig(n_dim=128)), (DocumentArrayRedis, RedisConfig(n_dim=128)), (DocumentArrayMilvus, MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, OpenSearchConfig(n_dim=128)), ], ) @pytest.mark.parametrize('is_copy', [True, False]) diff --git a/tests/unit/array/test_pull_out.py b/tests/unit/array/test_pull_out.py index c36dde294b0..39191ac3a8e 100644 --- a/tests/unit/array/test_pull_out.py +++ b/tests/unit/array/test_pull_out.py @@ -22,6 +22,7 @@ def docs(): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -58,6 +59,7 @@ def test_update_embedding(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -94,6 +96,7 @@ def test_update_doc_embedding(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -128,6 +131,7 @@ def test_batch_update_embedding(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -164,6 +168,7 @@ def test_batch_update_doc_embedding(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -187,6 +192,7 @@ def test_update_id(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -209,6 +215,7 @@ def test_update_doc_id(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], @@ -234,6 +241,7 @@ def test_batch_update_id(docs, storage, config, start_storage): ('annlite', {'n_dim': 2}), ('qdrant', {'n_dim': 2}), ('elasticsearch', {'n_dim': 2}), + ('opensearch', {'n_dim': 2}), ('redis', {'n_dim': 2}), ('milvus', {'n_dim': 2}), ], diff --git a/tests/unit/array/test_sequence.py b/tests/unit/array/test_sequence.py index 5c86237e049..f076bbfe556 100644 --- a/tests/unit/array/test_sequence.py +++ b/tests/unit/array/test_sequence.py @@ -6,10 +6,12 @@ from docarray import Document, DocumentArray from docarray.array.elastic import DocumentArrayElastic from docarray.array.memory import DocumentArrayInMemory +from docarray.array.opensearch import DocumentArrayOpenSearch from docarray.array.qdrant import DocumentArrayQdrant from docarray.array.redis import DocumentArrayRedis from docarray.array.sqlite import DocumentArraySqlite from docarray.array.storage.elastic import ElasticConfig +from docarray.array.storage.opensearch import OpenSearchConfig from docarray.array.storage.qdrant import QdrantConfig from docarray.array.storage.redis import RedisConfig from docarray.array.storage.sqlite import SqliteConfig @@ -29,6 +31,7 @@ (DocumentArrayElastic, lambda: ElasticConfig(n_dim=1)), (DocumentArrayRedis, lambda: RedisConfig(n_dim=1)), (DocumentArrayMilvus, lambda: MilvusConfig(n_dim=128)), + (DocumentArrayOpenSearch, lambda: OpenSearchConfig(n_dim=1)), ], ) def test_insert(da_cls, config, start_storage): @@ -86,6 +89,7 @@ def update_config_inplace(config, tmpdir, tmpfile): ('weaviate', {'n_dim': 3, 'name': 'Weaviate'}), ('qdrant', {'n_dim': 3, 'collection_name': 'qdrant'}), ('elasticsearch', {'n_dim': 3, 'index_name': 'elasticsearch'}), + ('opensearch', {'n_dim': 3, 'index_name': 'opensearch'}), ('redis', {'n_dim': 3, 'index_name': 'redis'}), ('milvus', {'n_dim': 3, 'collection_name': 'redis'}), ], @@ -125,6 +129,7 @@ def test_context_manager_from_disk(storage, config, start_storage, tmpdir, tmpfi ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ], ) def test_extend_subindex(storage, config, start_storage): @@ -172,6 +177,7 @@ def test_extend_subindex(storage, config, start_storage): ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ], ) def test_append_subindex(storage, config, start_storage): @@ -223,6 +229,7 @@ def embeddings_eq(emb1, emb2): ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ], ) @pytest.mark.parametrize( @@ -251,6 +258,7 @@ def test_del_and_append(index, storage, config, start_storage): ('sqlite', dict()), ('redis', {'n_dim': 3, 'distance': 'L2'}), ('milvus', {'n_dim': 3, 'distance': 'L2'}), + ('opensearch', {'n_dim': 3, 'distance': 'l2'}), ], ) @pytest.mark.parametrize(