From 18fb99913b30bc02c97d20362dd040ceeb0449e3 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 23 Nov 2022 21:00:41 +0800 Subject: [PATCH 1/6] fix: rebuild index of redis when clear storage Signed-off-by: AnneY --- docarray/array/storage/redis/backend.py | 29 ++++++++++++----------- docarray/array/storage/redis/getsetdel.py | 1 + 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/docarray/array/storage/redis/backend.py b/docarray/array/storage/redis/backend.py index 8652f0fefee..fc60d671eef 100644 --- a/docarray/array/storage/redis/backend.py +++ b/docarray/array/storage/redis/backend.py @@ -84,7 +84,14 @@ def _init_storage( self._doc_prefix = config.index_name + ':' self._config.columns = self._normalize_columns(self._config.columns) - self._client = self._build_client() + self._client = Redis( + host=self._config.host, + port=self._config.port, + **self._config.redis_config, + ) + + self._initialize_redis_index() + super()._init_storage() if _docs is None: @@ -94,25 +101,19 @@ def _init_storage( elif isinstance(_docs, Document): self.append(_docs) - def _build_client(self): - client = Redis( - host=self._config.host, - port=self._config.port, - **self._config.redis_config, - ) - - if self._config.update_schema: - if self._config.index_name.encode() in client.execute_command('FT._LIST'): - client.ft(index_name=self._config.index_name).dropindex() + def _initialize_redis_index(self, rebuild: bool = False): + if self._config.update_schema or rebuild: + if self._config.index_name.encode() in self._client.execute_command( + 'FT._LIST' + ): + self._client.ft(index_name=self._config.index_name).dropindex() schema = self._build_schema_from_redis_config() idef = IndexDefinition(prefix=[self._doc_prefix]) - client.ft(index_name=self._config.index_name).create_index( + self._client.ft(index_name=self._config.index_name).create_index( schema, definition=idef ) - return client - def _ensure_unique_config( self, config_root: dict, diff --git a/docarray/array/storage/redis/getsetdel.py b/docarray/array/storage/redis/getsetdel.py index 41eb61c77ac..30908470361 100644 --- a/docarray/array/storage/redis/getsetdel.py +++ b/docarray/array/storage/redis/getsetdel.py @@ -125,4 +125,5 @@ def _clear_storage(self): self._client.ft(index_name=self._config.index_name).dropindex( delete_documents=True ) + self._initialize_redis_index(rebuild=True) self._client.delete(self._offset2id_key) From 7105b7faa0fd96ed674b337f2e126ca1b9c6c1d3 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 23 Nov 2022 22:05:10 +0800 Subject: [PATCH 2/6] refactor: change build index funtion name Signed-off-by: AnneY --- docarray/array/storage/redis/backend.py | 4 ++-- docarray/array/storage/redis/getsetdel.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docarray/array/storage/redis/backend.py b/docarray/array/storage/redis/backend.py index fc60d671eef..1600c914fe9 100644 --- a/docarray/array/storage/redis/backend.py +++ b/docarray/array/storage/redis/backend.py @@ -90,7 +90,7 @@ def _init_storage( **self._config.redis_config, ) - self._initialize_redis_index() + self._build_index() super()._init_storage() @@ -101,7 +101,7 @@ def _init_storage( elif isinstance(_docs, Document): self.append(_docs) - def _initialize_redis_index(self, rebuild: bool = False): + def _build_index(self, rebuild: bool = False): if self._config.update_schema or rebuild: if self._config.index_name.encode() in self._client.execute_command( 'FT._LIST' diff --git a/docarray/array/storage/redis/getsetdel.py b/docarray/array/storage/redis/getsetdel.py index 30908470361..3e7d467d020 100644 --- a/docarray/array/storage/redis/getsetdel.py +++ b/docarray/array/storage/redis/getsetdel.py @@ -125,5 +125,5 @@ def _clear_storage(self): self._client.ft(index_name=self._config.index_name).dropindex( delete_documents=True ) - self._initialize_redis_index(rebuild=True) + self._build_index(rebuild=True) self._client.delete(self._offset2id_key) From e4b4b60a8a42f1b7b95121294f56378b85cfaaac Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 23 Nov 2022 22:07:36 +0800 Subject: [PATCH 3/6] fix: rebuild index of elastic when clear storage Signed-off-by: AnneY --- docarray/array/storage/elastic/backend.py | 23 ++++++++++----------- docarray/array/storage/elastic/getsetdel.py | 1 + 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/docarray/array/storage/elastic/backend.py b/docarray/array/storage/elastic/backend.py index 99b6b27fddc..3f30adc731d 100644 --- a/docarray/array/storage/elastic/backend.py +++ b/docarray/array/storage/elastic/backend.py @@ -93,8 +93,14 @@ def _init_storage( self._config.columns = self._normalize_columns(self._config.columns) self.n_dim = self._config.n_dim - self._client = self._build_client() self._list_like = self._config.list_like + + self._client = Elasticsearch( + hosts=self._config.hosts, + **self._config.es_config, + ) + + self._build_index() self._build_offset2id_index() # Note super()._init_storage() calls _load_offset2ids which calls _get_offset2ids_meta @@ -166,22 +172,15 @@ def _build_schema_from_elastic_config(self, elastic_config): ] = index_options return da_schema - def _build_client(self): - - client = Elasticsearch( - hosts=self._config.hosts, - **self._config.es_config, - ) - + def _build_index(self): schema = self._build_schema_from_elastic_config(self._config) - if not client.indices.exists(index=self._config.index_name): - client.indices.create( + if not self._client.indices.exists(index=self._config.index_name): + self._client.indices.create( index=self._config.index_name, mappings=schema['mappings'] ) - client.indices.refresh(index=self._config.index_name) - return client + self._client.indices.refresh(index=self._config.index_name) def _send_requests(self, request, **kwargs) -> List[Dict]: """Send bulk request to Elastic and gather the successful info""" diff --git a/docarray/array/storage/elastic/getsetdel.py b/docarray/array/storage/elastic/getsetdel.py index 019250875b5..d93ac105189 100644 --- a/docarray/array/storage/elastic/getsetdel.py +++ b/docarray/array/storage/elastic/getsetdel.py @@ -121,6 +121,7 @@ def _del_doc_by_id(self, _id: str): def _clear_storage(self): """Concrete implementation of base class' ``_clear_storage``""" self._client.indices.delete(index=self._config.index_name) + self._build_index() def _load_offset2ids(self): if self._list_like: From 708e918dbdde5d3b2c695b49b607f02fbee56e80 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 23 Nov 2022 22:14:24 +0800 Subject: [PATCH 4/6] test: add tests for getset subindex in store Signed-off-by: AnneY --- tests/unit/array/test_advance_indexing.py | 55 ++++++++++++++++------- 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/tests/unit/array/test_advance_indexing.py b/tests/unit/array/test_advance_indexing.py index ac27fa8bf6f..86fb1a07f9e 100644 --- a/tests/unit/array/test_advance_indexing.py +++ b/tests/unit/array/test_advance_indexing.py @@ -459,25 +459,10 @@ def test_path_syntax_indexing_set(storage, config, use_subindex, start_storage): assert da[2].id == 'new_id' -@pytest.mark.parametrize( - 'storage,config', - [ - ('memory', None), - ('sqlite', None), - ('weaviate', WeaviateConfig(n_dim=123)), - ('annlite', AnnliteConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123)), - ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True)), - ('elasticsearch', ElasticConfig(n_dim=123)), - ('redis', RedisConfig(n_dim=123)), - ('milvus', MilvusConfig(n_dim=123)), - ], -) -def test_getset_subindex(storage, config, start_storage): +def test_getset_subindex(): da = DocumentArray( [Document(chunks=[Document() for _ in range(5)]) for _ in range(3)], - config=config, - subindex_configs={'@c': {'n_dim': 123}} if config else {'@c': None}, + subindex_configs={'@c': None}, ) with da: assert len(da['@c']) == 15 @@ -509,6 +494,42 @@ def test_getset_subindex(storage, config, start_storage): assert collected_chunks == new_chunks +@pytest.mark.parametrize( + 'storage,config,subindex_config', + [ + ('memory', None, None), + ('sqlite', None, None), + ('weaviate', WeaviateConfig(n_dim=123), {'n_dim': 123}), + ('annlite', AnnliteConfig(n_dim=123), {'n_dim': 123}), + ('qdrant', QdrantConfig(n_dim=123), {'n_dim': 123}), + ('qdrant', QdrantConfig(n_dim=123, prefer_grpc=True), {'n_dim': 123}), + ('elasticsearch', ElasticConfig(n_dim=123), {'n_dim': 123}), + ('redis', RedisConfig(n_dim=123), {'n_dim': 123}), + ('milvus', MilvusConfig(n_dim=123), {'n_dim': 123}), + ], +) +def test_getset_subindex_in_store(storage, config, subindex_config, start_storage): + da = DocumentArray( + [Document(chunks=[Document() for _ in range(5)]) for _ in range(3)], + storage=storage, + config=config, + subindex_configs={'@c': subindex_config}, + ) + with da: + assert len(da['@c']) == 15 + assert len(da._subindices['@c']) == 15 + + chunks_ids = [c.id for c in da['@c']] + new_chunks = [ + Document(id=cid, embedding=np.ones(123) * i) + for i, cid in enumerate(chunks_ids) + ] + da['@c'] = new_chunks + + res = da.find(np.random.random(123), on='@c') + assert len(res) > 0 + + @pytest.mark.parametrize('size', [1, 5]) @pytest.mark.parametrize( 'storage,config_gen', From dcb42b0bdaa9a29ebf942c457596a294547d01fe Mon Sep 17 00:00:00 2001 From: AnneY Date: Fri, 25 Nov 2022 17:33:02 +0800 Subject: [PATCH 5/6] fix: fix elastic __setstate__ client Signed-off-by: AnneY --- docarray/array/storage/elastic/backend.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docarray/array/storage/elastic/backend.py b/docarray/array/storage/elastic/backend.py index 3f30adc731d..520ae229b68 100644 --- a/docarray/array/storage/elastic/backend.py +++ b/docarray/array/storage/elastic/backend.py @@ -286,4 +286,7 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__ = state - self._client = self._build_client() + self._client = Elasticsearch( + hosts=self._config.hosts, + **self._config.es_config, + ) From df982f44fdf4496ee8a6a216b79ca16dac86e2f8 Mon Sep 17 00:00:00 2001 From: AnneY Date: Fri, 25 Nov 2022 19:18:40 +0800 Subject: [PATCH 6/6] refactor: keep _build_client Signed-off-by: AnneY --- docarray/array/storage/elastic/backend.py | 19 ++++++++++--------- docarray/array/storage/redis/backend.py | 21 ++++++++++----------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/docarray/array/storage/elastic/backend.py b/docarray/array/storage/elastic/backend.py index 520ae229b68..d71d5e3bde2 100644 --- a/docarray/array/storage/elastic/backend.py +++ b/docarray/array/storage/elastic/backend.py @@ -95,11 +95,7 @@ def _init_storage( self.n_dim = self._config.n_dim self._list_like = self._config.list_like - self._client = Elasticsearch( - hosts=self._config.hosts, - **self._config.es_config, - ) - + self._client = self._build_client() self._build_index() self._build_offset2id_index() @@ -172,6 +168,14 @@ def _build_schema_from_elastic_config(self, elastic_config): ] = index_options return da_schema + def _build_client(self): + client = Elasticsearch( + hosts=self._config.hosts, + **self._config.es_config, + ) + + return client + def _build_index(self): schema = self._build_schema_from_elastic_config(self._config) @@ -286,7 +290,4 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__ = state - self._client = Elasticsearch( - hosts=self._config.hosts, - **self._config.es_config, - ) + self._client = self._build_client() diff --git a/docarray/array/storage/redis/backend.py b/docarray/array/storage/redis/backend.py index 1600c914fe9..1fbcc0dd9c1 100644 --- a/docarray/array/storage/redis/backend.py +++ b/docarray/array/storage/redis/backend.py @@ -84,12 +84,7 @@ def _init_storage( self._doc_prefix = config.index_name + ':' self._config.columns = self._normalize_columns(self._config.columns) - self._client = Redis( - host=self._config.host, - port=self._config.port, - **self._config.redis_config, - ) - + self._client = self._build_client() self._build_index() super()._init_storage() @@ -101,6 +96,14 @@ def _init_storage( elif isinstance(_docs, Document): self.append(_docs) + def _build_client(self): + client = Redis( + host=self._config.host, + port=self._config.port, + **self._config.redis_config, + ) + return client + def _build_index(self, rebuild: bool = False): if self._config.update_schema or rebuild: if self._config.index_name.encode() in self._client.execute_command( @@ -196,8 +199,4 @@ def __getstate__(self): def __setstate__(self, state): self.__dict__ = state - self._client = Redis( - host=self._config.host, - port=self._config.port, - **self._config.redis_config, - ) + self._client = self._build_client()