From c1c32d02c03ae9ca537baee284d380a8a7a241a6 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Tue, 30 Aug 2022 16:02:57 +0100 Subject: [PATCH 01/33] fix: adapt benchmark script to latest docarray --- scripts/benchmarking.py | 74 +++++++++++++++++++++++++------------- scripts/docker-compose.yml | 12 ++++--- 2 files changed, 58 insertions(+), 28 deletions(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 0d2dc057844..ad623b451e8 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -12,7 +12,7 @@ from rich.console import Console from rich.table import Table -n_index_values = [1_000_000] +n_index_values = [1_000] n_query = 1 D = 128 TENSOR_SHAPE = (512, 256) @@ -98,12 +98,12 @@ def recall(predicted, relevant, eval_at): if args.default_hnsw: storage_backends = [ - # ('memory', None), - # ('sqlite', None), - # ( - # 'annlite', - # {'n_dim': D}, - # ), + ('memory', None), + ('sqlite', None), + ( + 'annlite', + {'n_dim': D, 'columns': [('i', 'int')]}, + ), ( 'qdrant', { @@ -117,13 +117,15 @@ def recall(predicted, relevant, eval_at): { 'n_dim': D, 'port': '41234', + 'columns': ('i', 'int'), }, ), ( 'elasticsearch', { 'n_dim': D, - 'port': '41235', + 'hosts': 'http://localhost:41235', + 'columns': [('i', 'int')], }, ), ( @@ -131,22 +133,24 @@ def recall(predicted, relevant, eval_at): { 'n_dim': D, 'port': '41236', + 'columns': [('i', 'int')], }, ), ] else: storage_backends = [ - # ('memory', None), - # ('sqlite', None), - # ( - # 'annlite', - # { - # 'n_dim': D, - # 'ef_construction': 100, - # 'ef_search': 100, - # 'max_connection': 16, - # }, - # ), + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + 'columns': [('i', 'int')], + }, + ), ( 'qdrant', { @@ -165,15 +169,32 @@ def recall(predicted, relevant, eval_at): 'ef_construction': 100, 'max_connections': 16, 'port': '41234', + 'columns': [('i', 'int')], }, ), ( 'elasticsearch', - {'n_dim': D, 'ef_construction': 100, 'm': 16, 'port': '41235'}, + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'hosts': 'http://localhost:41235', + 'columns': [('i', 'int')], + }, ), ('redis', {'n_dim': D, 'ef_construction': 100, 'm': 16, 'port': '41236'}), ] +storage_backend_filters = { + 'memory': {'tags__i': {'$eq': 0}}, + 'sqlite': {'tags__i': {'$eq': 0}}, + 'annlite': {'i': {'$eq': 0}}, + 'qdrant': {'tags__i': {'$eq': 0}}, + 'weaviate': {'path': 'i', 'operator': 'Equal', 'valueInt': 0}, + 'elasticsearch': {'match': {'i': 0}}, + 'redis': {'i': {'$eq': 0}}, +} + table = Table( title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' ) @@ -236,13 +257,15 @@ def recall(predicted, relevant, eval_at): f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'sqlite': - find_by_vector_time, result = find_by_vector(da, vector_queries[0]) + find_by_vector_time, result = find_by_vector( + da, vector_queries[0].squeeze() + ) recall_at_k = recall(result, ground_truth[0], K) else: recall_at_k_values = [] find_by_vector_times = [] for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector(da, query) + find_by_vector_time, results = find_by_vector(da, query.squeeze()) find_by_vector_times.append(find_by_vector_time) if backend == 'memory': ground_truth.append(results) @@ -256,7 +279,9 @@ def recall(predicted, relevant, eval_at): ) console.print(f'finding {n_query} docs by condition ...') - find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) + find_by_condition_time, _ = find_by_condition( + da, storage_backend_filters[backend] + ) if idx == len(n_index_values) - 1: table.add_row( @@ -290,7 +315,8 @@ def recall(predicted, relevant, eval_at): find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) except Exception as e: - console.print(f'Storage Backend {backend} failed: {e}') + console.print(f'Storage Backend {backend} failed') + raise e find_df = pd.DataFrame(find_by_vector_values) find_df.index = [backend for backend, _ in storage_backends] diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 4d917761b60..638375158ae 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -3,7 +3,7 @@ services: weaviate: image: semitechnologies/weaviate:1.13.2 ports: - - "41234:41234" + - "41234:8080" environment: CONTEXTIONARY_URL: contextionary:9999 QUERY_DEFAULTS_LIMIT: 25 @@ -12,7 +12,7 @@ services: qdrant: image: qdrant/qdrant:v0.7.0 ports: - - "41233:41233" + - "41233:6333" ulimits: # Only required for tests, as there are a lot of collections created nofile: soft: 65535 @@ -23,8 +23,12 @@ services: - xpack.security.enabled=false - discovery.type=single-node ports: - - "41235:41235" + - "41235:9200" redis: image: redislabs/redisearch:2.6.0 ports: - - "41236:41236" + - "41236:6379" + +networks: + elastic: + name: elastic From 77b307035c989ed7e068a4ff88c80c1cc5e261d9 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Tue, 30 Aug 2022 16:20:52 +0100 Subject: [PATCH 02/33] chore: increase index size --- scripts/benchmarking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index ad623b451e8..0880a4778f3 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -12,7 +12,7 @@ from rich.console import Console from rich.table import Table -n_index_values = [1_000] +n_index_values = [1_000_000] n_query = 1 D = 128 TENSOR_SHAPE = (512, 256) From 8bebacde101923c6e571934d037de9488efc7375 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Wed, 31 Aug 2022 16:22:49 +0100 Subject: [PATCH 03/33] chore: add exclude backends --- scripts/benchmarking.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 0880a4778f3..75f0b32eef7 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -12,6 +12,8 @@ from rich.console import Console from rich.table import Table +np.random.seed(123) + n_index_values = [1_000_000] n_query = 1 D = 128 @@ -26,6 +28,12 @@ help='Whether to use default HNSW configurations', action='store_true', ) + +parser.add_argument( + '--exclude-backends', + help='list of comma separated backends to exclude from the benchmarks', + type=str, +) args = parser.parse_args() times = {} @@ -185,6 +193,12 @@ def recall(predicted, relevant, eval_at): ('redis', {'n_dim': D, 'ef_construction': 100, 'm': 16, 'port': '41236'}), ] +storage_backends = [ + (backend, config) + for backend, config in storage_backends + if backend not in args.exclude_backends.split(',') +] + storage_backend_filters = { 'memory': {'tags__i': {'$eq': 0}}, 'sqlite': {'tags__i': {'$eq': 0}}, From 6fa645dcb17c89c861137e30771c4457b42ada4e Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Thu, 1 Sep 2022 10:33:16 +0100 Subject: [PATCH 04/33] fix: ignore errors in drop --- scripts/benchmarking.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 75f0b32eef7..18bf29494c4 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -334,7 +334,7 @@ def recall(predicted, relevant, eval_at): find_df = pd.DataFrame(find_by_vector_values) find_df.index = [backend for backend, _ in storage_backends] -find_df = find_df.drop(['sqlite']) +find_df = find_df.drop(['sqlite'], errors='ignore') print(find_df) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) From 3f1ea34bd73c7935e6198a10a59a200591ca18e5 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Tue, 20 Sep 2022 07:47:36 +0100 Subject: [PATCH 05/33] chore: ef_runtime for redis --- scripts/benchmarking.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 18bf29494c4..70fc010f9dc 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -190,7 +190,16 @@ def recall(predicted, relevant, eval_at): 'columns': [('i', 'int')], }, ), - ('redis', {'n_dim': D, 'ef_construction': 100, 'm': 16, 'port': '41236'}), + ( + 'redis', + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'ef_runtime': 100, + 'port': '41236', + }, + ), ] storage_backends = [ From 4c8f2ccb0d46245ef88a90421567551eef718475 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Tue, 20 Sep 2022 07:58:35 +0100 Subject: [PATCH 06/33] fix: fix exclude --- scripts/benchmarking.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 70fc010f9dc..e5b55fb1df6 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -105,6 +105,7 @@ def recall(predicted, relevant, eval_at): if args.default_hnsw: + print('here') storage_backends = [ ('memory', None), ('sqlite', None), @@ -205,7 +206,7 @@ def recall(predicted, relevant, eval_at): storage_backends = [ (backend, config) for backend, config in storage_backends - if backend not in args.exclude_backends.split(',') + if backend not in (args.exclude_backends or '').split(',') ] storage_backend_filters = { From 0f47ee3c2b31f2b009b319cd446d2bc8aa66ec76 Mon Sep 17 00:00:00 2001 From: Alaeddine Abdessalem Date: Tue, 20 Sep 2022 08:25:15 +0100 Subject: [PATCH 07/33] chore: remove print --- scripts/benchmarking.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index e5b55fb1df6..453a1afa276 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -105,7 +105,6 @@ def recall(predicted, relevant, eval_at): if args.default_hnsw: - print('here') storage_backends = [ ('memory', None), ('sqlite', None), From 859c6daaa0631f0cf40919468160a88eb1d86983 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 12 Oct 2022 16:06:18 +0800 Subject: [PATCH 08/33] fix: update data type of columns --- scripts/benchmarking.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 453a1afa276..e205202e800 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -110,7 +110,10 @@ def recall(predicted, relevant, eval_at): ('sqlite', None), ( 'annlite', - {'n_dim': D, 'columns': [('i', 'int')]}, + { + 'n_dim': D, + 'columns': {'i': 'int'}, + }, ), ( 'qdrant', @@ -125,7 +128,7 @@ def recall(predicted, relevant, eval_at): { 'n_dim': D, 'port': '41234', - 'columns': ('i', 'int'), + 'columns': {'i': 'int'}, }, ), ( @@ -133,7 +136,7 @@ def recall(predicted, relevant, eval_at): { 'n_dim': D, 'hosts': 'http://localhost:41235', - 'columns': [('i', 'int')], + 'columns': {'i': 'int'}, }, ), ( @@ -141,7 +144,7 @@ def recall(predicted, relevant, eval_at): { 'n_dim': D, 'port': '41236', - 'columns': [('i', 'int')], + 'columns': {'i': 'int'}, }, ), ] @@ -156,7 +159,7 @@ def recall(predicted, relevant, eval_at): 'ef_construction': 100, 'ef_search': 100, 'max_connection': 16, - 'columns': [('i', 'int')], + 'columns': {'i': 'int'}, }, ), ( @@ -177,7 +180,7 @@ def recall(predicted, relevant, eval_at): 'ef_construction': 100, 'max_connections': 16, 'port': '41234', - 'columns': [('i', 'int')], + 'columns': {'i': 'int'}, }, ), ( @@ -187,7 +190,7 @@ def recall(predicted, relevant, eval_at): 'ef_construction': 100, 'm': 16, 'hosts': 'http://localhost:41235', - 'columns': [('i', 'int')], + 'columns': {'i': 'int'}, }, ), ( From 51110c1f31c36e0bc85d73e430f1228f69244afa Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 12 Oct 2022 16:11:18 +0800 Subject: [PATCH 09/33] fix: fix benchmark_df append --- scripts/benchmarking.py | 27 ++++++++++----------------- 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index e205202e800..eeb1fbbd294 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -320,23 +320,16 @@ def recall(predicted, relevant, eval_at): '{:.3f}'.format(recall_at_k), fmt(find_by_condition_time, 's'), ) - benchmark_df.append( - pd.DataFrame( - [ - [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - ], - columns=benchmark_df.columns, - ) - ) + benchmark_df.loc[len(benchmark_df.index)] = [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) From 904c6209a147dc97670566872605caec38afff7b Mon Sep 17 00:00:00 2001 From: David Buchaca Date: Mon, 25 Apr 2022 11:26:09 +0200 Subject: [PATCH 10/33] feat: add benchmark adapted for sift1m --- scripts/benchmarking_sift_1M.py | 308 ++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 scripts/benchmarking_sift_1M.py diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py new file mode 100644 index 00000000000..fedd3621e93 --- /dev/null +++ b/scripts/benchmarking_sift_1M.py @@ -0,0 +1,308 @@ +import argparse +import functools +import random +from time import perf_counter + +import numpy as np +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt + +from docarray import Document, DocumentArray +from rich.console import Console +from rich.table import Table +import h5py +import os + +n_query = 1 +D = 128 +TENSOR_SHAPE = (512, 256) +K = 10 +n_vector_queries = 1000 +np.random.seed(123) +DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') +dataset = h5py.File(DATASET_PATH, 'r') +n_index_values = [len(X_tr)] + +X_tr = dataset['train'][0:] +X_te = dataset['test'][0:] + +parser = argparse.ArgumentParser() +parser.add_argument( + '--default-hnsw', + help='Whether to use default HNSW configurations', + action='store_true', +) +args = parser.parse_args() + +times = {} + + +def timer(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter() + res = func(*args, **kwargs) + return (perf_counter() - start, res) + + return wrapper + + +@timer +def create(da, docs): + da.extend(docs) + + +@timer +def read(da, ids): + da[ids] + + +@timer +def update(da, docs): + da[[d.id for d in docs]] = docs + + +@timer +def delete(da, ids): + del da[ids] + + +@timer +def find_by_condition(da, query): + da.find(query) + + +@timer +def find_by_vector(da, query): + return da.find(query, limit=K) + + +def get_docs(X_tr): + return [ + Document( + embedding=x, + # tensor=np.random.rand(*tensor_shape), + tags={'i': int(i)}, + ) + for i, x in enumerate(X_tr) + ] + + +def fmt(value, unit): + return '{:.3f} {}'.format(value, unit) + + +def recall(predicted, relevant, eval_at): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len( + set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + ) + return n_predicted_and_relevant / len(relevant) + + +if args.default_hnsw: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + {'n_dim': D}, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), + ('weaviate', {'n_dim': D}), + ('elasticsearch', {'n_dim': D}), + ] +else: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + }, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), + ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ] + +table = Table( + title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' +) +benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } +) + +for col in benchmark_df.columns: + table.add_column(col) + +console = Console() +find_by_vector_values = {str(n_index): [] for n_index in n_index_values} +create_values = {str(n_index): [] for n_index in n_index_values} + +console.print(f'Reading dataset') +docs = get_docs(X_tr) +docs_to_delete = random.sample(docs, n_query) +docs_to_update = random.sample(docs, n_query) +vector_queries = [x for x in X_te] +ground_truth = [] + +for idx, n_index in enumerate(n_index_values): + for backend, config in storage_backends: + try: + console.print('Backend:', backend.title()) + # for n_i in n_index: + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + console.print(f'indexing {n_index} docs ...') + create_time, _ = create(da, docs) + # for n_q in n_query: + console.print(f'reading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + console.print(f'updating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + console.print(f'deleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + console.print( + f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'sqlite': + find_by_vector_time, result = find_by_vector(da, vector_queries[0]) + recall_at_k = recall(result, ground_truth[0], K) + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector(da, query) + find_by_vector_times.append(find_by_vector_time) + if backend == 'memory': + ground_truth.append(results) + recall_at_k_values.append(1) + else: + recall_at_k_values.append(recall(results, ground_truth[i], K)) + recall_at_k = sum(recall_at_k_values) / len(recall_at_k_values) + find_by_vector_time = sum(find_by_vector_times) / len( + find_by_vector_times + ) + console.print(f'finding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) + if idx == len(n_index_values) - 1: + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.append( + pd.DataFrame( + [ + [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + ], + columns=benchmark_df.columns, + ) + ) + find_by_vector_values[str(n_index)].append(find_by_vector_time) + create_values[str(n_index)].append(create_time) + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + +find_df = pd.DataFrame(find_by_vector_values) +find_df.index = [backend for backend, _ in storage_backends] +find_df = find_df.drop(['sqlite']) +print(find_df) +fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + +find_df.plot( + kind="bar", + ax=ax1, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Find by vector per backend and dataset size', + # ylabel='seconds', + rot=0, +) +ax1.set_ylabel('seconds', fontsize=18) +ax1.set_title('Find by vector per backend and dataset size', fontsize=18) + +threshold = 0.3 +ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') + +create_df = pd.DataFrame(create_values) +create_df.index = [backend for backend, _ in storage_backends] + +create_df = create_df.drop(['memory']) +print(create_df) +create_df.plot( + kind="bar", + ax=ax2, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Indexing per backend and dataset size', + # ylabel='seconds', + rot=0, +) + +ax2.set_ylabel('seconds', fontsize=18) +ax2.set_title('Indexing per backend and dataset size', fontsize=18) + +plt.tight_layout() +ax1.legend(fontsize=15) +ax2.legend(fontsize=15) + +plt.savefig('benchmark.svg') +console.print(table) + +benchmark_df.to_csv('benchmark-seconds.csv') + +benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( + lambda value: 1_000_000 / value +) +benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) +benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply(lambda value: 1 / value) +benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply(lambda value: 1 / value) +benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( + lambda value: 1 / value +) +benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( + lambda value: 1 / value +) + +benchmark_df.to_csv('benchmark-qps.csv') From 39ed230dc4f1d7721967e7fc9b4b50d542d307eb Mon Sep 17 00:00:00 2001 From: davidbp Date: Mon, 25 Apr 2022 12:11:56 +0200 Subject: [PATCH 11/33] fix: order --- scripts/benchmarking_sift_1M.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index fedd3621e93..d11cd285d30 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -22,10 +22,10 @@ np.random.seed(123) DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') dataset = h5py.File(DATASET_PATH, 'r') -n_index_values = [len(X_tr)] -X_tr = dataset['train'][0:] -X_te = dataset['test'][0:] +X_tr = dataset['train'][0:1000] +X_te = dataset['test'][0:10] +n_index_values = [len(X_tr)] parser = argparse.ArgumentParser() parser.add_argument( From ae5fa9c375b089647f01025c3fc9cacb4d6392e1 Mon Sep 17 00:00:00 2001 From: davidbp Date: Mon, 25 Apr 2022 13:07:51 +0200 Subject: [PATCH 12/33] fix: drop sqlite in table --- scripts/benchmarking_sift_1M.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index d11cd285d30..4dbab944477 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -18,14 +18,14 @@ D = 128 TENSOR_SHAPE = (512, 256) K = 10 -n_vector_queries = 1000 np.random.seed(123) DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') dataset = h5py.File(DATASET_PATH, 'r') -X_tr = dataset['train'][0:1000] -X_te = dataset['test'][0:10] +X_tr = dataset['train'] +X_te = dataset['test'] n_index_values = [len(X_tr)] +n_vector_queries = len(X_te) parser = argparse.ArgumentParser() parser.add_argument( @@ -169,26 +169,26 @@ def recall(predicted, relevant, eval_at): for idx, n_index in enumerate(n_index_values): for backend, config in storage_backends: try: - console.print('Backend:', backend.title()) + console.print('\nBackend:', backend.title()) # for n_i in n_index: if not config: da = DocumentArray(storage=backend) else: da = DocumentArray(storage=backend, config=config) - console.print(f'indexing {n_index} docs ...') + console.print(f'\tindexing {n_index} docs ...') create_time, _ = create(da, docs) # for n_q in n_query: - console.print(f'reading {n_query} docs ...') + console.print(f'\treading {n_query} docs ...') read_time, _ = read( da, random.sample([d.id for d in docs], n_query), ) - console.print(f'updating {n_query} docs ...') + console.print(f'\tupdating {n_query} docs ...') update_time, _ = update(da, docs_to_update) - console.print(f'deleting {n_query} docs ...') + console.print(f'\tdeleting {n_query} docs ...') delete_time, _ = delete(da, [d.id for d in docs_to_delete]) console.print( - f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'sqlite': find_by_vector_time, result = find_by_vector(da, vector_queries[0]) @@ -208,7 +208,7 @@ def recall(predicted, relevant, eval_at): find_by_vector_time = sum(find_by_vector_times) / len( find_by_vector_times ) - console.print(f'finding {n_query} docs by condition ...') + console.print(f'\tfinding {n_query} docs by condition ...') find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) if idx == len(n_index_values) - 1: table.add_row( @@ -240,13 +240,13 @@ def recall(predicted, relevant, eval_at): ) find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) + console.print(table) except Exception as e: console.print(f'Storage Backend {backend} failed: {e}') - find_df = pd.DataFrame(find_by_vector_values) +storage_backends.remove(('sqlite', None)) find_df.index = [backend for backend, _ in storage_backends] -find_df = find_df.drop(['sqlite']) print(find_df) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) @@ -260,7 +260,7 @@ def recall(predicted, relevant, eval_at): rot=0, ) ax1.set_ylabel('seconds', fontsize=18) -ax1.set_title('Find by vector per backend and dataset size', fontsize=18) +ax1.set_title('Find by vector per backend', fontsize=18) threshold = 0.3 ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') @@ -281,11 +281,11 @@ def recall(predicted, relevant, eval_at): ) ax2.set_ylabel('seconds', fontsize=18) -ax2.set_title('Indexing per backend and dataset size', fontsize=18) +ax2.set_title('Indexing per backend', fontsize=18) plt.tight_layout() -ax1.legend(fontsize=15) -ax2.legend(fontsize=15) +#ax1.legend(fontsize=15) +#ax2.legend(fontsize=15) plt.savefig('benchmark.svg') console.print(table) From c5d7f36bb663a967d4724769a137032ee1b57585 Mon Sep 17 00:00:00 2001 From: davidbp Date: Tue, 26 Apr 2022 07:55:40 +0200 Subject: [PATCH 13/33] fix: delete docarray for once finished --- scripts/benchmarking_sift_1M.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index 4dbab944477..c9da1174a37 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -241,6 +241,8 @@ def recall(predicted, relevant, eval_at): find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) console.print(table) + da.clear() + del da except Exception as e: console.print(f'Storage Backend {backend} failed: {e}') From 250e5ad267b502eefbaf8e0ce40f7b754998ad5a Mon Sep 17 00:00:00 2001 From: jina Date: Tue, 26 Apr 2022 09:56:53 +0200 Subject: [PATCH 14/33] fix: correctly drop sqlite --- scripts/benchmarking_sift_1M.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index c9da1174a37..7aa07519173 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -1,3 +1,4 @@ + import argparse import functools import random @@ -19,11 +20,12 @@ TENSOR_SHAPE = (512, 256) K = 10 np.random.seed(123) -DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') +DATASET_PATH = './sift-128-euclidean.hdf5' dataset = h5py.File(DATASET_PATH, 'r') X_tr = dataset['train'] X_te = dataset['test'] + n_index_values = [len(X_tr)] n_vector_queries = len(X_te) @@ -247,9 +249,9 @@ def recall(predicted, relevant, eval_at): console.print(f'Storage Backend {backend} failed: {e}') find_df = pd.DataFrame(find_by_vector_values) -storage_backends.remove(('sqlite', None)) find_df.index = [backend for backend, _ in storage_backends] print(find_df) +find_df = find_df.drop(['sqlite']) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) find_df.plot( From 1aa82594db5522003586852eff1348eea4256d99 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 13:11:11 +0200 Subject: [PATCH 15/33] feat: speedup benchmark from groundtruth --- scripts/benchmarking_dataset.py | 232 ++++++++++++++++++++++++++++++++ scripts/benchmarking_utils.py | 157 +++++++++++++++++++++ 2 files changed, 389 insertions(+) create mode 100644 scripts/benchmarking_dataset.py create mode 100644 scripts/benchmarking_utils.py diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py new file mode 100644 index 00000000000..b309ad840aa --- /dev/null +++ b/scripts/benchmarking_dataset.py @@ -0,0 +1,232 @@ +import argparse +import random + +import pandas as pd + +from docarray import DocumentArray +from rich.console import Console +from rich.table import Table +import h5py +import numpy as np + +from benchmarking_utils import ( + create, + read, + update, + delete, + find_by_condition, + find_by_vector, + get_docs, + fmt, + recall_from_numpy, + save_benchmark_df, + plot_results, +) + + +def get_configuration_storage_backends(argparse): + parser = argparse.ArgumentParser() + parser.add_argument( + '--default-hnsw', + help='Whether to use default HNSW configurations', + action='store_true', + ) + + args = parser.parse_args() + + if args.default_hnsw: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + {'n_dim': D}, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), + ('weaviate', {'n_dim': D}), + ('elasticsearch', {'n_dim': D}), + ] + else: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + }, + ), + ( + 'qdrant', + {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}, + ), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), + ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ] + return storage_backends + + +def run_benchmark( + X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends +): + table = Table( + title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' + ) + benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } + ) + + for col in benchmark_df.columns: + table.add_column(col) + + console = Console() + find_by_vector_values = {str(n_index): [] for n_index in n_index_values} + create_values = {str(n_index): [] for n_index in n_index_values} + + console.print(f'Reading dataset') + docs = get_docs(X_tr) + docs_to_delete = random.sample(docs, n_query) + docs_to_update = random.sample(docs, n_query) + vector_queries = [x for x in X_te] + ground_truth = [] + + for idx, n_index in enumerate(n_index_values): + for backend, config in storage_backends: + try: + console.print('\nBackend:', backend.title()) + # for n_i in n_index: + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + console.print(f'\tindexing {n_index} docs ...') + create_time, _ = create(da, docs) + # for n_q in n_query: + console.print(f'\treading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + console.print( + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'memory': + find_by_vector_time, _ = find_by_vector( + da, vector_queries[0], limit=K + ) + ground_truth = [ + x for x in dataset['neighbors'][0 : len(vector_queries)] + ] + recall_at_k = 1 + elif backend == 'sqlite': + find_by_vector_time, result = find_by_vector( + da, vector_queries[0], limit=K + ) + recall_at_k = 1 + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector( + da, query, limit=K + ) + find_by_vector_times.append(find_by_vector_time) + recall_at_k_values.append( + recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[i], K + ) + ) + + recall_at_k = np.mean(recall_at_k_values) + find_by_vector_time = np.mean(find_by_vector_times) + + console.print(f'\tfinding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition( + da, {'tags__i': {'$eq': 0}} + ) + if idx == len(n_index_values) - 1: + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.append( + pd.DataFrame( + [ + [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + ], + columns=benchmark_df.columns, + ) + ) + + # store find_by_vector time + find_by_vector_values[str(n_index)].append(find_by_vector_time) + create_values[str(n_index)].append(create_time) + console.print(table) + da.clear() + del da + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + console.print(table) + return find_by_vector_values, create_values, benchmark_df + + +if __name__ == "__main__": + + # Parameters settable by the user + n_query = 1 + K = 10 + DATASET_PATH = 'sift-128-euclidean.hdf5' + np.random.seed(123) + + # Variables gathered from the dataset + dataset = h5py.File(DATASET_PATH, 'r') + X_tr = dataset['train'] + X_te = dataset['test'] + D = X_tr.shape[1] + n_index_values = [len(X_tr)] + n_vector_queries = len(X_te) + + # Benchmark + storage_backends = get_configuration_storage_backends(argparse) + find_by_vector_values, create_values, benchmark_df = run_benchmark( + X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + ) + plot_results( + find_by_vector_values, storage_backends, create_values, plot_legend=False + ) + save_benchmark_df(benchmark_df) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py new file mode 100644 index 00000000000..a1357d0932a --- /dev/null +++ b/scripts/benchmarking_utils.py @@ -0,0 +1,157 @@ +import functools +from time import perf_counter +from docarray import Document +import numpy as np +import seaborn as sns +import matplotlib.pyplot as plt +import pandas as pd + + +def timer(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter() + res = func(*args, **kwargs) + return (perf_counter() - start, res) + + return wrapper + + +@timer +def create(da, docs): + da.extend(docs) + + +@timer +def read(da, ids): + da[ids] + + +@timer +def update(da, docs): + da[[d.id for d in docs]] = docs + + +@timer +def delete(da, ids): + del da[ids] + + +@timer +def find_by_condition(da, query): + da.find(query) + + +@timer +def find_by_vector(da, query, limit): + return da.find(query, limit=limit) + + +def get_docs(X_tr): + return [ + Document( + embedding=x, + # tensor=np.random.rand(*tensor_shape), + tags={'i': int(i)}, + ) + for i, x in enumerate(X_tr) + ] + + +def fmt(value, unit): + return '{:.3f} {}'.format(value, unit) + + +def recall(predicted, relevant, eval_at): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len( + set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + ) + return n_predicted_and_relevant / len(relevant) + + +def recall_from_numpy(predicted: np.ndarray, relevant: np.ndarray, eval_at: int): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len(set(predicted_at_k).intersection(set(relevant))) + return n_predicted_and_relevant / len(relevant) + + +def save_benchmark_df(benchmark_df): + benchmark_df.to_csv('benchmark-seconds.csv') + + benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( + lambda value: 1_000_000 / value + ) + benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) + benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply( + lambda value: 1 / value + ) + benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply( + lambda value: 1 / value + ) + benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( + lambda value: 1 / value + ) + benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( + lambda value: 1 / value + ) + + benchmark_df.to_csv('benchmark-qps.csv') + + +def plot_results( + find_by_vector_values, storage_backends, create_values, plot_legend=True +): + find_df = pd.DataFrame(find_by_vector_values) + find_df.index = [backend for backend, _ in storage_backends] + find_df = find_df.drop(['sqlite']) + + print('\n\nQuery times') + print(find_df) + + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + + find_df.plot( + kind="bar", + ax=ax1, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Find by vector per backend and dataset size', + # ylabel='seconds', + rot=0, + legend=plot_legend, + ) + ax1.set_ylabel('seconds', fontsize=18) + ax1.set_title('Find by vector per backend', fontsize=18) + + threshold = 0.3 + ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') + + create_df = pd.DataFrame(create_values) + create_df.index = [backend for backend, _ in storage_backends] + + create_df = create_df.drop(['memory']) + print('\n\nIndexing times') + print(create_df) + create_df.plot( + kind="bar", + ax=ax2, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Indexing per backend and dataset size', + # ylabel='seconds', + rot=0, + legend=plot_legend, + ) + + ax2.set_ylabel('seconds', fontsize=18) + ax2.set_title('Indexing per backend', fontsize=18) + + plt.tight_layout() + # ax1.legend(fontsize=15) + # ax2.legend(fontsize=15) + plt.savefig('benchmark.svg') From cabb2f4eb57b6bd24d429134629fff0a007ab3bd Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 13:16:34 +0200 Subject: [PATCH 16/33] refactor: sift1M --- scripts/benchmarking_sift_1M.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index 7aa07519173..aeaf57f6ff2 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -1,4 +1,3 @@ - import argparse import functools import random @@ -23,8 +22,8 @@ DATASET_PATH = './sift-128-euclidean.hdf5' dataset = h5py.File(DATASET_PATH, 'r') -X_tr = dataset['train'] -X_te = dataset['test'] +X_tr = dataset['train'][0:1000] +X_te = dataset['test'][0:100] n_index_values = [len(X_tr)] n_vector_queries = len(X_te) @@ -288,8 +287,8 @@ def recall(predicted, relevant, eval_at): ax2.set_title('Indexing per backend', fontsize=18) plt.tight_layout() -#ax1.legend(fontsize=15) -#ax2.legend(fontsize=15) +# ax1.legend(fontsize=15) +# ax2.legend(fontsize=15) plt.savefig('benchmark.svg') console.print(table) From 09518ff247bda44b030d4bb1126d8e1823a6acf6 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 17:16:04 +0200 Subject: [PATCH 17/33] fix: recall bug --- scripts/benchmarking_dataset.py | 30 +++++++++++++++++++++++------- scripts/benchmarking_utils.py | 16 +++++++++++++++- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index b309ad840aa..6a7cc4d581f 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -37,7 +37,6 @@ def get_configuration_storage_backends(argparse): if args.default_hnsw: storage_backends = [ ('memory', None), - ('sqlite', None), ( 'annlite', {'n_dim': D}, @@ -45,11 +44,11 @@ def get_configuration_storage_backends(argparse): ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), ('weaviate', {'n_dim': D}), ('elasticsearch', {'n_dim': D}), + ('sqlite', None), ] else: storage_backends = [ ('memory', None), - ('sqlite', None), ( 'annlite', { @@ -68,12 +67,21 @@ def get_configuration_storage_backends(argparse): {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, ), ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ('sqlite', None), ] return storage_backends def run_benchmark( - X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + X_tr, + X_te, + dataset, + n_index_values, + n_vector_queries, + n_query, + storage_backends, + K, + D, ): table = Table( title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' @@ -130,11 +138,11 @@ def run_benchmark( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'memory': - find_by_vector_time, _ = find_by_vector( + find_by_vector_time, aux = find_by_vector( da, vector_queries[0], limit=K ) ground_truth = [ - x for x in dataset['neighbors'][0 : len(vector_queries)] + x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)] ] recall_at_k = 1 elif backend == 'sqlite': @@ -152,7 +160,7 @@ def run_benchmark( find_by_vector_times.append(find_by_vector_time) recall_at_k_values.append( recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i], K + np.array(results[:, 'tags__i']), ground_truth[i][0:K], K ) ) @@ -224,7 +232,15 @@ def run_benchmark( # Benchmark storage_backends = get_configuration_storage_backends(argparse) find_by_vector_values, create_values, benchmark_df = run_benchmark( - X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + X_tr, + X_te, + dataset, + n_index_values, + n_vector_queries, + n_query, + storage_backends, + K, + D, ) plot_results( find_by_vector_values, storage_backends, create_values, plot_legend=False diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index a1357d0932a..dbb4e33a12f 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -1,5 +1,7 @@ import functools from time import perf_counter +from typing import Iterable + from docarray import Document import numpy as np import seaborn as sns @@ -72,7 +74,19 @@ def recall(predicted, relevant, eval_at): return n_predicted_and_relevant / len(relevant) -def recall_from_numpy(predicted: np.ndarray, relevant: np.ndarray, eval_at: int): +def recall_from_numpy(predicted, relevant, eval_at: int): + """ + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],5) + 1.0 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],4) + 0.8 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],3) + 0.3 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],2) + 0.4 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],1) + 0.2 + """ if eval_at == 0: return 0.0 predicted_at_k = predicted[:eval_at] From 8f6c375a7f7dc8642d30bd92dbd527d73549f254 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 27 Apr 2022 17:39:05 +0200 Subject: [PATCH 18/33] refactor: get groundtruth before --- scripts/benchmarking_dataset.py | 21 +++++++++------------ scripts/benchmarking_utils.py | 1 - 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index 6a7cc4d581f..c7e218085b2 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -36,19 +36,22 @@ def get_configuration_storage_backends(argparse): if args.default_hnsw: storage_backends = [ - ('memory', None), + ('weaviate', {'n_dim': D}), ( 'annlite', {'n_dim': D}, ), ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), - ('weaviate', {'n_dim': D}), ('elasticsearch', {'n_dim': D}), ('sqlite', None), + ('memory', None), ] else: storage_backends = [ - ('memory', None), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), ( 'annlite', { @@ -62,12 +65,9 @@ def get_configuration_storage_backends(argparse): 'qdrant', {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}, ), - ( - 'weaviate', - {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, - ), ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), ('sqlite', None), + ('memory', None), ] return storage_backends @@ -111,7 +111,7 @@ def run_benchmark( docs_to_delete = random.sample(docs, n_query) docs_to_update = random.sample(docs, n_query) vector_queries = [x for x in X_te] - ground_truth = [] + ground_truth = [x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)]] for idx, n_index in enumerate(n_index_values): for backend, config in storage_backends: @@ -141,9 +141,6 @@ def run_benchmark( find_by_vector_time, aux = find_by_vector( da, vector_queries[0], limit=K ) - ground_truth = [ - x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)] - ] recall_at_k = 1 elif backend == 'sqlite': find_by_vector_time, result = find_by_vector( @@ -160,7 +157,7 @@ def run_benchmark( find_by_vector_times.append(find_by_vector_time) recall_at_k_values.append( recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i][0:K], K + np.array(results[:, 'tags__i']), ground_truth[i], K ) ) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index dbb4e33a12f..386d9061bc3 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -123,7 +123,6 @@ def plot_results( find_df = pd.DataFrame(find_by_vector_values) find_df.index = [backend for backend, _ in storage_backends] find_df = find_df.drop(['sqlite']) - print('\n\nQuery times') print(find_df) From e294afb5130f4dbf2f4582aa91af65e2c0e7aeb0 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 12 Oct 2022 21:34:10 +0800 Subject: [PATCH 19/33] fix: backend config and filters --- scripts/benchmarking_dataset.py | 106 +++++++++++++++++++++++++++----- 1 file changed, 91 insertions(+), 15 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index c7e218085b2..6d351263f35 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -36,22 +36,52 @@ def get_configuration_storage_backends(argparse): if args.default_hnsw: storage_backends = [ - ('weaviate', {'n_dim': D}), + ('memory', None), + ('sqlite', None), ( 'annlite', - {'n_dim': D}, + { + 'n_dim': D, + 'columns': {'i': 'int'}, + }, + ), + ( + 'qdrant', + { + 'n_dim': D, + 'scroll_batch_size': 8, + 'port': '41233', + }, ), - ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), - ('elasticsearch', {'n_dim': D}), - ('sqlite', None), - ('memory', None), - ] - else: - storage_backends = [ ( 'weaviate', - {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + { + 'n_dim': D, + 'port': '41234', + 'columns': {'i': 'int'}, + }, ), + ( + 'elasticsearch', + { + 'n_dim': D, + 'hosts': 'http://localhost:41235', + 'columns': {'i': 'int'}, + }, + ), + ( + 'redis', + { + 'n_dim': D, + 'port': '41236', + 'columns': {'i': 'int'}, + }, + ), + ] + else: + storage_backends = [ + ('memory', None), + ('sqlite', None), ( 'annlite', { @@ -59,19 +89,65 @@ def get_configuration_storage_backends(argparse): 'ef_construction': 100, 'ef_search': 100, 'max_connection': 16, + 'columns': {'i': 'int'}, }, ), ( 'qdrant', - {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}, + { + 'n_dim': D, + 'scroll_batch_size': 8, + 'ef_construct': 100, + 'm': 16, + 'port': '41233', + }, + ), + ( + 'weaviate', + { + 'n_dim': D, + 'ef': 100, + 'ef_construction': 100, + 'max_connections': 16, + 'port': '41234', + 'columns': {'i': 'int'}, + }, + ), + ( + 'elasticsearch', + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'hosts': 'http://localhost:41235', + 'columns': {'i': 'int'}, + }, + ), + ( + 'redis', + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'ef_runtime': 100, + 'port': '41236', + }, ), - ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), - ('sqlite', None), - ('memory', None), ] return storage_backends +storage_backend_filters = { + 'memory': {'tags__i': {'$eq': 0}}, + 'sqlite': {'tags__i': {'$eq': 0}}, + 'annlite': {'i': {'$eq': 0}}, + 'qdrant': {'tags__i': {'$eq': 0}}, + 'weaviate': {'path': 'i', 'operator': 'Equal', 'valueInt': 0}, + 'elasticsearch': {'match': {'i': 0}}, + 'redis': {'i': {'$eq': 0}}, +} + + def run_benchmark( X_tr, X_te, @@ -166,7 +242,7 @@ def run_benchmark( console.print(f'\tfinding {n_query} docs by condition ...') find_by_condition_time, _ = find_by_condition( - da, {'tags__i': {'$eq': 0}} + da, storage_backend_filters[backend] ) if idx == len(n_index_values) - 1: table.add_row( From b3259b1cd1fc99f1993ae33b01af65c1f623a042 Mon Sep 17 00:00:00 2001 From: AnneY Date: Thu, 13 Oct 2022 11:30:17 +0800 Subject: [PATCH 20/33] fix: ignore drop error --- scripts/benchmarking_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index 386d9061bc3..67468a32ff3 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -122,7 +122,7 @@ def plot_results( ): find_df = pd.DataFrame(find_by_vector_values) find_df.index = [backend for backend, _ in storage_backends] - find_df = find_df.drop(['sqlite']) + find_df = find_df.drop(['sqlite'], errors='ignore') print('\n\nQuery times') print(find_df) From dc27a6487f3d91deb7f5844c6dfebec87ef5a1a1 Mon Sep 17 00:00:00 2001 From: AnneY Date: Thu, 13 Oct 2022 11:34:56 +0800 Subject: [PATCH 21/33] refactor: adjust funcs to be same as random --- scripts/benchmarking_dataset.py | 57 +++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index 6d351263f35..f6dd99f7aca 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -32,6 +32,12 @@ def get_configuration_storage_backends(argparse): action='store_true', ) + parser.add_argument( + '--exclude-backends', + help='list of comma separated backends to exclude from the benchmarks', + type=str, + ) + args = parser.parse_args() if args.default_hnsw: @@ -134,6 +140,12 @@ def get_configuration_storage_backends(argparse): }, ), ] + + storage_backends = [ + (backend, config) + for backend, config in storage_backends + if backend not in (args.exclude_backends or '').split(',') + ] return storage_backends @@ -198,31 +210,41 @@ def run_benchmark( da = DocumentArray(storage=backend) else: da = DocumentArray(storage=backend, config=config) + console.print(f'\tindexing {n_index} docs ...') create_time, _ = create(da, docs) + # for n_q in n_query: console.print(f'\treading {n_query} docs ...') read_time, _ = read( da, random.sample([d.id for d in docs], n_query), ) + console.print(f'\tupdating {n_query} docs ...') update_time, _ = update(da, docs_to_update) + console.print(f'\tdeleting {n_query} docs ...') delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + console.print( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'memory': - find_by_vector_time, aux = find_by_vector( + find_by_vector_time, results = find_by_vector( da, vector_queries[0], limit=K ) - recall_at_k = 1 + recall_at_k = recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[0], K + ) elif backend == 'sqlite': find_by_vector_time, result = find_by_vector( da, vector_queries[0], limit=K ) - recall_at_k = 1 + # recall_at_k = 1 + recall_at_k = recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[0], K + ) else: recall_at_k_values = [] find_by_vector_times = [] @@ -244,6 +266,7 @@ def run_benchmark( find_by_condition_time, _ = find_by_condition( da, storage_backend_filters[backend] ) + if idx == len(n_index_values) - 1: table.add_row( backend.title(), @@ -255,28 +278,20 @@ def run_benchmark( '{:.3f}'.format(recall_at_k), fmt(find_by_condition_time, 's'), ) - benchmark_df.append( - pd.DataFrame( - [ - [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - ], - columns=benchmark_df.columns, - ) - ) + benchmark_df.loc[len(benchmark_df.index)] = [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] # store find_by_vector time find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) - console.print(table) da.clear() del da except Exception as e: From c96273c735c689fc38c2c69f3ddca9d432d05f8d Mon Sep 17 00:00:00 2001 From: AnneY Date: Thu, 13 Oct 2022 15:49:15 +0800 Subject: [PATCH 22/33] fix: fix elasticsearch connection timeout --- scripts/benchmarking_dataset.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index f6dd99f7aca..2765dd465bd 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -73,6 +73,7 @@ def get_configuration_storage_backends(argparse): 'n_dim': D, 'hosts': 'http://localhost:41235', 'columns': {'i': 'int'}, + 'es_config': {'timeout': 300}, }, ), ( From 0b2c814f8ce4fbe5a2fa4f5b71a1eb43f3b6f008 Mon Sep 17 00:00:00 2001 From: AnneY Date: Thu, 13 Oct 2022 22:26:52 +0800 Subject: [PATCH 23/33] refactor: remove duplicated code --- ...king_dataset.py => benchmarking_sift1m.py} | 0 scripts/benchmarking_sift_1M.py | 311 ------------------ 2 files changed, 311 deletions(-) rename scripts/{benchmarking_dataset.py => benchmarking_sift1m.py} (100%) delete mode 100644 scripts/benchmarking_sift_1M.py diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_sift1m.py similarity index 100% rename from scripts/benchmarking_dataset.py rename to scripts/benchmarking_sift1m.py diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py deleted file mode 100644 index aeaf57f6ff2..00000000000 --- a/scripts/benchmarking_sift_1M.py +++ /dev/null @@ -1,311 +0,0 @@ -import argparse -import functools -import random -from time import perf_counter - -import numpy as np -import pandas as pd -import seaborn as sns -import matplotlib.pyplot as plt - -from docarray import Document, DocumentArray -from rich.console import Console -from rich.table import Table -import h5py -import os - -n_query = 1 -D = 128 -TENSOR_SHAPE = (512, 256) -K = 10 -np.random.seed(123) -DATASET_PATH = './sift-128-euclidean.hdf5' -dataset = h5py.File(DATASET_PATH, 'r') - -X_tr = dataset['train'][0:1000] -X_te = dataset['test'][0:100] - -n_index_values = [len(X_tr)] -n_vector_queries = len(X_te) - -parser = argparse.ArgumentParser() -parser.add_argument( - '--default-hnsw', - help='Whether to use default HNSW configurations', - action='store_true', -) -args = parser.parse_args() - -times = {} - - -def timer(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - start = perf_counter() - res = func(*args, **kwargs) - return (perf_counter() - start, res) - - return wrapper - - -@timer -def create(da, docs): - da.extend(docs) - - -@timer -def read(da, ids): - da[ids] - - -@timer -def update(da, docs): - da[[d.id for d in docs]] = docs - - -@timer -def delete(da, ids): - del da[ids] - - -@timer -def find_by_condition(da, query): - da.find(query) - - -@timer -def find_by_vector(da, query): - return da.find(query, limit=K) - - -def get_docs(X_tr): - return [ - Document( - embedding=x, - # tensor=np.random.rand(*tensor_shape), - tags={'i': int(i)}, - ) - for i, x in enumerate(X_tr) - ] - - -def fmt(value, unit): - return '{:.3f} {}'.format(value, unit) - - -def recall(predicted, relevant, eval_at): - if eval_at == 0: - return 0.0 - predicted_at_k = predicted[:eval_at] - n_predicted_and_relevant = len( - set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) - ) - return n_predicted_and_relevant / len(relevant) - - -if args.default_hnsw: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - {'n_dim': D}, - ), - ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), - ('weaviate', {'n_dim': D}), - ('elasticsearch', {'n_dim': D}), - ] -else: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'ef_construction': 100, - 'ef_search': 100, - 'max_connection': 16, - }, - ), - ('qdrant', {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}), - ( - 'weaviate', - {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, - ), - ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), - ] - -table = Table( - title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' -) -benchmark_df = pd.DataFrame( - { - 'Storage Backend': [], - 'Indexing time (C)': [], - 'Query (R)': [], - 'Update (U)': [], - 'Delete (D)': [], - 'Find by vector': [], - f'Recall at k={K} for vector search': [], - 'Find by condition': [], - } -) - -for col in benchmark_df.columns: - table.add_column(col) - -console = Console() -find_by_vector_values = {str(n_index): [] for n_index in n_index_values} -create_values = {str(n_index): [] for n_index in n_index_values} - -console.print(f'Reading dataset') -docs = get_docs(X_tr) -docs_to_delete = random.sample(docs, n_query) -docs_to_update = random.sample(docs, n_query) -vector_queries = [x for x in X_te] -ground_truth = [] - -for idx, n_index in enumerate(n_index_values): - for backend, config in storage_backends: - try: - console.print('\nBackend:', backend.title()) - # for n_i in n_index: - if not config: - da = DocumentArray(storage=backend) - else: - da = DocumentArray(storage=backend, config=config) - console.print(f'\tindexing {n_index} docs ...') - create_time, _ = create(da, docs) - # for n_q in n_query: - console.print(f'\treading {n_query} docs ...') - read_time, _ = read( - da, - random.sample([d.id for d in docs], n_query), - ) - console.print(f'\tupdating {n_query} docs ...') - update_time, _ = update(da, docs_to_update) - console.print(f'\tdeleting {n_query} docs ...') - delete_time, _ = delete(da, [d.id for d in docs_to_delete]) - console.print( - f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' - ) - if backend == 'sqlite': - find_by_vector_time, result = find_by_vector(da, vector_queries[0]) - recall_at_k = recall(result, ground_truth[0], K) - else: - recall_at_k_values = [] - find_by_vector_times = [] - for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector(da, query) - find_by_vector_times.append(find_by_vector_time) - if backend == 'memory': - ground_truth.append(results) - recall_at_k_values.append(1) - else: - recall_at_k_values.append(recall(results, ground_truth[i], K)) - recall_at_k = sum(recall_at_k_values) / len(recall_at_k_values) - find_by_vector_time = sum(find_by_vector_times) / len( - find_by_vector_times - ) - console.print(f'\tfinding {n_query} docs by condition ...') - find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) - if idx == len(n_index_values) - 1: - table.add_row( - backend.title(), - fmt(create_time, 's'), - fmt(read_time * 1000, 'ms'), - fmt(update_time * 1000, 'ms'), - fmt(delete_time * 1000, 'ms'), - fmt(find_by_vector_time, 's'), - '{:.3f}'.format(recall_at_k), - fmt(find_by_condition_time, 's'), - ) - benchmark_df.append( - pd.DataFrame( - [ - [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - ], - columns=benchmark_df.columns, - ) - ) - find_by_vector_values[str(n_index)].append(find_by_vector_time) - create_values[str(n_index)].append(create_time) - console.print(table) - da.clear() - del da - except Exception as e: - console.print(f'Storage Backend {backend} failed: {e}') - -find_df = pd.DataFrame(find_by_vector_values) -find_df.index = [backend for backend, _ in storage_backends] -print(find_df) -find_df = find_df.drop(['sqlite']) -fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) - -find_df.plot( - kind="bar", - ax=ax1, - fontsize=16, - color=sns.color_palette('muted')[1:4], - # title='Find by vector per backend and dataset size', - # ylabel='seconds', - rot=0, -) -ax1.set_ylabel('seconds', fontsize=18) -ax1.set_title('Find by vector per backend', fontsize=18) - -threshold = 0.3 -ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') - -create_df = pd.DataFrame(create_values) -create_df.index = [backend for backend, _ in storage_backends] - -create_df = create_df.drop(['memory']) -print(create_df) -create_df.plot( - kind="bar", - ax=ax2, - fontsize=16, - color=sns.color_palette('muted')[1:4], - # title='Indexing per backend and dataset size', - # ylabel='seconds', - rot=0, -) - -ax2.set_ylabel('seconds', fontsize=18) -ax2.set_title('Indexing per backend', fontsize=18) - -plt.tight_layout() -# ax1.legend(fontsize=15) -# ax2.legend(fontsize=15) - -plt.savefig('benchmark.svg') -console.print(table) - -benchmark_df.to_csv('benchmark-seconds.csv') - -benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( - lambda value: 1_000_000 / value -) -benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) -benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply(lambda value: 1 / value) -benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply(lambda value: 1 / value) -benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( - lambda value: 1 / value -) -benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( - lambda value: 1 / value -) - -benchmark_df.to_csv('benchmark-qps.csv') From 08bbee328225fbc98c3a2a35f69082882356f0b7 Mon Sep 17 00:00:00 2001 From: AnneY Date: Mon, 17 Oct 2022 16:39:44 +0800 Subject: [PATCH 24/33] refactor: reuse code --- scripts/benchmarking.py | 437 ++++------------------------------ scripts/benchmarking_utils.py | 423 +++++++++++++++++++++++++++++++- 2 files changed, 462 insertions(+), 398 deletions(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index eeb1fbbd294..a586a30e0b5 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -1,401 +1,52 @@ import argparse -import functools -import random -from time import perf_counter import numpy as np -import pandas as pd -import seaborn as sns -import matplotlib.pyplot as plt -from docarray import Document, DocumentArray -from rich.console import Console -from rich.table import Table - -np.random.seed(123) - -n_index_values = [1_000_000] -n_query = 1 -D = 128 -TENSOR_SHAPE = (512, 256) -K = 10 -n_vector_queries = 1000 -np.random.seed(123) - -parser = argparse.ArgumentParser() -parser.add_argument( - '--default-hnsw', - help='Whether to use default HNSW configurations', - action='store_true', -) - -parser.add_argument( - '--exclude-backends', - help='list of comma separated backends to exclude from the benchmarks', - type=str, -) -args = parser.parse_args() - -times = {} - - -def timer(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - start = perf_counter() - res = func(*args, **kwargs) - return (perf_counter() - start, res) - - return wrapper - - -@timer -def create(da, docs): - da.extend(docs) - - -@timer -def read(da, ids): - da[ids] - - -@timer -def update(da, docs): - da[[d.id for d in docs]] = docs - - -@timer -def delete(da, ids): - del da[ids] - - -@timer -def find_by_condition(da, query): - da.find(query) - - -@timer -def find_by_vector(da, query): - return da.find(query, limit=K) - - -def get_docs(n, n_dim, tensor_shape, n_query): - return [ - Document( - embedding=np.random.rand(n_dim), - # tensor=np.random.rand(*tensor_shape), - tags={'i': int(i / n_query)}, +from benchmarking_utils import ( + get_configuration_storage_backends, + plot_results, + run_benchmark2, + save_benchmark_df, +) + +if __name__ == "__main__": + + # Parameters settable by the user + n_index_values = [1000] + n_query = 1 + D = 128 + TENSOR_SHAPE = (512, 256) + K = 10 + n_vector_queries = 100 + np.random.seed(123) + + # Benchmark + storage_backends = get_configuration_storage_backends(argparse, D) + find_by_vector_values = {str(n_index): [] for n_index in n_index_values} + create_values = {str(n_index): [] for n_index in n_index_values} + + for idx, n_index in enumerate(n_index_values): + train = [np.random.rand(D) for _ in range(n_index)] + test = [np.random.rand(D) for _ in range(n_vector_queries)] + ground_truth = [] + + find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark2( + train, + test, + ground_truth, + n_index, + n_vector_queries, + n_query, + storage_backends, + K, + D, ) - for i in range(n) - ] + # store find_by_vector time + find_by_vector_values[str(n_index)] = find_by_vector_time_all + create_values[str(n_index)] = create_time_all + save_benchmark_df(benchmark_df, n_index) -def fmt(value, unit): - return '{:.3f} {}'.format(value, unit) - - -def recall(predicted, relevant, eval_at): - if eval_at == 0: - return 0.0 - predicted_at_k = predicted[:eval_at] - n_predicted_and_relevant = len( - set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + plot_results( + find_by_vector_values, storage_backends, create_values, plot_legend=False ) - return n_predicted_and_relevant / len(relevant) - - -if args.default_hnsw: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', - { - 'n_dim': D, - 'scroll_batch_size': 8, - 'port': '41233', - }, - ), - ( - 'weaviate', - { - 'n_dim': D, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', - { - 'n_dim': D, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - }, - ), - ( - 'redis', - { - 'n_dim': D, - 'port': '41236', - 'columns': {'i': 'int'}, - }, - ), - ] -else: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'ef_construction': 100, - 'ef_search': 100, - 'max_connection': 16, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', - { - 'n_dim': D, - 'scroll_batch_size': 8, - 'ef_construct': 100, - 'm': 16, - 'port': '41233', - }, - ), - ( - 'weaviate', - { - 'n_dim': D, - 'ef': 100, - 'ef_construction': 100, - 'max_connections': 16, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', - { - 'n_dim': D, - 'ef_construction': 100, - 'm': 16, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - }, - ), - ( - 'redis', - { - 'n_dim': D, - 'ef_construction': 100, - 'm': 16, - 'ef_runtime': 100, - 'port': '41236', - }, - ), - ] - -storage_backends = [ - (backend, config) - for backend, config in storage_backends - if backend not in (args.exclude_backends or '').split(',') -] - -storage_backend_filters = { - 'memory': {'tags__i': {'$eq': 0}}, - 'sqlite': {'tags__i': {'$eq': 0}}, - 'annlite': {'i': {'$eq': 0}}, - 'qdrant': {'tags__i': {'$eq': 0}}, - 'weaviate': {'path': 'i', 'operator': 'Equal', 'valueInt': 0}, - 'elasticsearch': {'match': {'i': 0}}, - 'redis': {'i': {'$eq': 0}}, -} - -table = Table( - title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' -) -benchmark_df = pd.DataFrame( - { - 'Storage Backend': [], - 'Indexing time (C)': [], - 'Query (R)': [], - 'Update (U)': [], - 'Delete (D)': [], - 'Find by vector': [], - f'Recall at k={K} for vector search': [], - 'Find by condition': [], - } -) - -for col in benchmark_df.columns: - table.add_column(col) - -console = Console() -find_by_vector_values = {str(n_index): [] for n_index in n_index_values} -create_values = {str(n_index): [] for n_index in n_index_values} - - -for idx, n_index in enumerate(n_index_values): - console.print(f'generating {n_index} docs...') - docs = get_docs(n_index, D, TENSOR_SHAPE, n_query) - docs_to_delete = random.sample(docs, n_query) - docs_to_update = random.sample(docs, n_query) - - vector_queries = [np.random.rand(n_query, D) for _ in range(n_vector_queries)] - ground_truth = [] - - for backend, config in storage_backends: - try: - console.print('Backend:', backend.title()) - # for n_i in n_index: - if not config: - da = DocumentArray(storage=backend) - else: - da = DocumentArray(storage=backend, config=config) - - console.print(f'indexing {n_index} docs ...') - create_time, _ = create(da, docs) - - # for n_q in n_query: - console.print(f'reading {n_query} docs ...') - read_time, _ = read( - da, - random.sample([d.id for d in docs], n_query), - ) - - console.print(f'updating {n_query} docs ...') - update_time, _ = update(da, docs_to_update) - - console.print(f'deleting {n_query} docs ...') - delete_time, _ = delete(da, [d.id for d in docs_to_delete]) - - console.print( - f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' - ) - if backend == 'sqlite': - find_by_vector_time, result = find_by_vector( - da, vector_queries[0].squeeze() - ) - recall_at_k = recall(result, ground_truth[0], K) - else: - recall_at_k_values = [] - find_by_vector_times = [] - for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector(da, query.squeeze()) - find_by_vector_times.append(find_by_vector_time) - if backend == 'memory': - ground_truth.append(results) - recall_at_k_values.append(1) - else: - recall_at_k_values.append(recall(results, ground_truth[i], K)) - - recall_at_k = sum(recall_at_k_values) / len(recall_at_k_values) - find_by_vector_time = sum(find_by_vector_times) / len( - find_by_vector_times - ) - - console.print(f'finding {n_query} docs by condition ...') - find_by_condition_time, _ = find_by_condition( - da, storage_backend_filters[backend] - ) - - if idx == len(n_index_values) - 1: - table.add_row( - backend.title(), - fmt(create_time, 's'), - fmt(read_time * 1000, 'ms'), - fmt(update_time * 1000, 'ms'), - fmt(delete_time * 1000, 'ms'), - fmt(find_by_vector_time, 's'), - '{:.3f}'.format(recall_at_k), - fmt(find_by_condition_time, 's'), - ) - benchmark_df.loc[len(benchmark_df.index)] = [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - - find_by_vector_values[str(n_index)].append(find_by_vector_time) - create_values[str(n_index)].append(create_time) - except Exception as e: - console.print(f'Storage Backend {backend} failed') - raise e - -find_df = pd.DataFrame(find_by_vector_values) -find_df.index = [backend for backend, _ in storage_backends] -find_df = find_df.drop(['sqlite'], errors='ignore') -print(find_df) -fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) - -find_df.plot( - kind="bar", - ax=ax1, - fontsize=16, - color=sns.color_palette('muted')[1:4], - # title='Find by vector per backend and dataset size', - # ylabel='seconds', - rot=0, -) -ax1.set_ylabel('seconds', fontsize=18) -ax1.set_title('Find by vector per backend and dataset size', fontsize=18) - -threshold = 0.3 -ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') - -create_df = pd.DataFrame(create_values) -create_df.index = [backend for backend, _ in storage_backends] - -create_df = create_df.drop(['memory']) -print(create_df) -create_df.plot( - kind="bar", - ax=ax2, - fontsize=16, - color=sns.color_palette('muted')[1:4], - # title='Indexing per backend and dataset size', - # ylabel='seconds', - rot=0, -) - -ax2.set_ylabel('seconds', fontsize=18) -ax2.set_title('Indexing per backend and dataset size', fontsize=18) - -plt.tight_layout() -ax1.legend(fontsize=15) -ax2.legend(fontsize=15) - -plt.savefig('benchmark.svg') -console.print(table) - -benchmark_df.to_csv('benchmark-seconds.csv') - -benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( - lambda value: 1_000_000 / value -) -benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) -benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply(lambda value: 1 / value) -benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply(lambda value: 1 / value) -benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( - lambda value: 1 / value -) -benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( - lambda value: 1 / value -) - -benchmark_df.to_csv('benchmark-qps.csv') diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index 67468a32ff3..d94606efb08 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -2,6 +2,14 @@ from time import perf_counter from typing import Iterable +import random + +import pandas as pd + +from rich.console import Console +from rich.table import Table + +from docarray import DocumentArray from docarray import Document import numpy as np import seaborn as sns @@ -64,6 +72,143 @@ def fmt(value, unit): return '{:.3f} {}'.format(value, unit) +def get_configuration_storage_backends(argparse, D): + parser = argparse.ArgumentParser() + parser.add_argument( + '--default-hnsw', + help='Whether to use default HNSW configurations', + action='store_true', + ) + + parser.add_argument( + '--exclude-backends', + help='list of comma separated backends to exclude from the benchmarks', + type=str, + ) + + args = parser.parse_args() + + if args.default_hnsw: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'columns': {'i': 'int'}, + }, + ), + ( + 'qdrant', + { + 'n_dim': D, + 'scroll_batch_size': 8, + 'port': '41233', + }, + ), + ( + 'weaviate', + { + 'n_dim': D, + 'port': '41234', + 'columns': {'i': 'int'}, + }, + ), + ( + 'elasticsearch', + { + 'n_dim': D, + 'hosts': 'http://localhost:41235', + 'columns': {'i': 'int'}, + 'es_config': {'timeout': 1000}, + }, + ), + ( + 'redis', + { + 'n_dim': D, + 'port': '41236', + 'columns': {'i': 'int'}, + }, + ), + ] + else: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + 'columns': {'i': 'int'}, + }, + ), + ( + 'qdrant', + { + 'n_dim': D, + 'scroll_batch_size': 8, + 'ef_construct': 100, + 'm': 16, + 'port': '41233', + }, + ), + ( + 'weaviate', + { + 'n_dim': D, + 'ef': 100, + 'ef_construction': 100, + 'max_connections': 16, + 'port': '41234', + 'columns': {'i': 'int'}, + }, + ), + ( + 'elasticsearch', + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'hosts': 'http://localhost:41235', + 'columns': {'i': 'int'}, + }, + ), + ( + 'redis', + { + 'n_dim': D, + 'ef_construction': 100, + 'm': 16, + 'ef_runtime': 100, + 'port': '41236', + }, + ), + ] + + storage_backends = [ + (backend, config) + for backend, config in storage_backends + if backend not in (args.exclude_backends or '').split(',') + ] + return storage_backends + + +storage_backend_filters = { + 'memory': {'tags__i': {'$eq': 0}}, + 'sqlite': {'tags__i': {'$eq': 0}}, + 'annlite': {'i': {'$eq': 0}}, + 'qdrant': {'tags__i': {'$eq': 0}}, + 'weaviate': {'path': 'i', 'operator': 'Equal', 'valueInt': 0}, + 'elasticsearch': {'match': {'i': 0}}, + 'redis': {'i': {'$eq': 0}}, +} + + def recall(predicted, relevant, eval_at): if eval_at == 0: return 0.0 @@ -94,8 +239,276 @@ def recall_from_numpy(predicted, relevant, eval_at: int): return n_predicted_and_relevant / len(relevant) -def save_benchmark_df(benchmark_df): - benchmark_df.to_csv('benchmark-seconds.csv') +def run_benchmark( + X_tr, + X_te, + dataset, + n_index_values, + n_vector_queries, + n_query, + storage_backends, + K, + D, +): + table = Table( + title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' + ) + benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } + ) + + for col in benchmark_df.columns: + table.add_column(col) + + console = Console() + find_by_vector_values = {str(n_index): [] for n_index in n_index_values} + create_values = {str(n_index): [] for n_index in n_index_values} + + console.print(f'Reading dataset') + docs = get_docs(X_tr) + docs_to_delete = random.sample(docs, n_query) + docs_to_update = random.sample(docs, n_query) + vector_queries = [x for x in X_te] + ground_truth = [x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)]] + + for idx, n_index in enumerate(n_index_values): + for backend, config in storage_backends: + try: + console.print('\nBackend:', backend.title()) + # for n_i in n_index: + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + + console.print(f'\tindexing {n_index} docs ...') + create_time, _ = create(da, docs) + + # for n_q in n_query: + console.print(f'\treading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + + console.print( + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'memory': + find_by_vector_time, results = find_by_vector( + da, vector_queries[0], limit=K + ) + recall_at_k = recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[0], K + ) + elif backend == 'sqlite': + find_by_vector_time, result = find_by_vector( + da, vector_queries[0], limit=K + ) + # recall_at_k = 1 + recall_at_k = recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[0], K + ) + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector( + da, query, limit=K + ) + find_by_vector_times.append(find_by_vector_time) + recall_at_k_values.append( + recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[i], K + ) + ) + + recall_at_k = np.mean(recall_at_k_values) + find_by_vector_time = np.mean(find_by_vector_times) + + console.print(f'\tfinding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition( + da, storage_backend_filters[backend] + ) + + if idx == len(n_index_values) - 1: + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.loc[len(benchmark_df.index)] = [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + + # store find_by_vector time + find_by_vector_values[str(n_index)].append(find_by_vector_time) + create_values[str(n_index)].append(create_time) + da.clear() + del da + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + console.print(table) + return find_by_vector_values, create_values, benchmark_df + + +def run_benchmark2( + train, + test, + ground_truth, + n_index, + n_vector_queries, + n_query, + storage_backends, + K, + D, +): + table = Table( + title=f'DocArray Benchmarking n_index={n_index} n_query={n_query} D={D} K={K}' + ) + benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } + ) + + for col in benchmark_df.columns: + table.add_column(col) + + console = Console() + + console.print(f'Reading dataset') + docs = get_docs(train) + docs_to_delete = random.sample(docs, n_query) + docs_to_update = random.sample(docs, n_query) + vector_queries = [x for x in test] + + find_by_vector_time_all = [] + create_time_all = [] + + for backend, config in storage_backends: + try: + console.print('\nBackend:', backend.title()) + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + + console.print(f'\tindexing {n_index} docs ...') + create_time, _ = create(da, docs) + + # for n_q in n_query: + console.print(f'\treading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + + console.print( + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'sqlite': + find_by_vector_time, result = find_by_vector( + da, vector_queries[0], limit=K + ) + recall_at_k = recall(result, ground_truth[0], K) + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector(da, query, limit=K) + find_by_vector_times.append(find_by_vector_time) + if backend == 'memory': + ground_truth.append(results) + recall_at_k_values.append(1) + else: + recall_at_k_values.append(recall(results, ground_truth[i], K)) + + recall_at_k = np.mean(recall_at_k_values) + find_by_vector_time = np.mean(find_by_vector_times) + + console.print(f'\tfinding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition( + da, storage_backend_filters[backend] + ) + + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.loc[len(benchmark_df.index)] = [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + + find_by_vector_time_all.append(find_by_vector_time) + create_time_all.append(create_time) + + da.clear() + del da + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + # print(find_by_vector_time_all) + console.print(table) + return find_by_vector_time_all, create_time_all, benchmark_df + + +def save_benchmark_df(benchmark_df, n_index): + benchmark_df.to_csv(f'benchmark-seconds-{n_index}.csv') benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( lambda value: 1_000_000 / value @@ -114,7 +527,7 @@ def save_benchmark_df(benchmark_df): lambda value: 1 / value ) - benchmark_df.to_csv('benchmark-qps.csv') + benchmark_df.to_csv(f'benchmark-qps-{n_index}.csv') def plot_results( @@ -165,6 +578,6 @@ def plot_results( ax2.set_title('Indexing per backend', fontsize=18) plt.tight_layout() - # ax1.legend(fontsize=15) - # ax2.legend(fontsize=15) + ax1.legend(fontsize=15) + ax2.legend(fontsize=15) plt.savefig('benchmark.svg') From 50ddac9c1309a16f10886e060d780974730bd8e6 Mon Sep 17 00:00:00 2001 From: AnneY Date: Mon, 17 Oct 2022 20:14:07 +0800 Subject: [PATCH 25/33] refactor: reuse code in both random and sift1m --- scripts/benchmarking.py | 8 +- scripts/benchmarking_sift1m.py | 330 +++------------------------------ scripts/benchmarking_utils.py | 194 ++----------------- 3 files changed, 41 insertions(+), 491 deletions(-) diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index a586a30e0b5..2da10251704 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -5,19 +5,19 @@ from benchmarking_utils import ( get_configuration_storage_backends, plot_results, - run_benchmark2, + run_benchmark, save_benchmark_df, ) if __name__ == "__main__": # Parameters settable by the user - n_index_values = [1000] + n_index_values = [1_000_000] n_query = 1 D = 128 TENSOR_SHAPE = (512, 256) K = 10 - n_vector_queries = 100 + n_vector_queries = 1000 np.random.seed(123) # Benchmark @@ -30,7 +30,7 @@ test = [np.random.rand(D) for _ in range(n_vector_queries)] ground_truth = [] - find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark2( + find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark( train, test, ground_truth, diff --git a/scripts/benchmarking_sift1m.py b/scripts/benchmarking_sift1m.py index 2765dd465bd..00c768cef21 100644 --- a/scripts/benchmarking_sift1m.py +++ b/scripts/benchmarking_sift1m.py @@ -1,337 +1,49 @@ import argparse -import random -import pandas as pd - -from docarray import DocumentArray -from rich.console import Console -from rich.table import Table import h5py -import numpy as np from benchmarking_utils import ( - create, - read, - update, - delete, - find_by_condition, - find_by_vector, - get_docs, - fmt, - recall_from_numpy, - save_benchmark_df, + get_configuration_storage_backends, plot_results, + run_benchmark, + save_benchmark_df, ) - -def get_configuration_storage_backends(argparse): - parser = argparse.ArgumentParser() - parser.add_argument( - '--default-hnsw', - help='Whether to use default HNSW configurations', - action='store_true', - ) - - parser.add_argument( - '--exclude-backends', - help='list of comma separated backends to exclude from the benchmarks', - type=str, - ) - - args = parser.parse_args() - - if args.default_hnsw: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', - { - 'n_dim': D, - 'scroll_batch_size': 8, - 'port': '41233', - }, - ), - ( - 'weaviate', - { - 'n_dim': D, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', - { - 'n_dim': D, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - 'es_config': {'timeout': 300}, - }, - ), - ( - 'redis', - { - 'n_dim': D, - 'port': '41236', - 'columns': {'i': 'int'}, - }, - ), - ] - else: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'ef_construction': 100, - 'ef_search': 100, - 'max_connection': 16, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', - { - 'n_dim': D, - 'scroll_batch_size': 8, - 'ef_construct': 100, - 'm': 16, - 'port': '41233', - }, - ), - ( - 'weaviate', - { - 'n_dim': D, - 'ef': 100, - 'ef_construction': 100, - 'max_connections': 16, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', - { - 'n_dim': D, - 'ef_construction': 100, - 'm': 16, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - }, - ), - ( - 'redis', - { - 'n_dim': D, - 'ef_construction': 100, - 'm': 16, - 'ef_runtime': 100, - 'port': '41236', - }, - ), - ] - - storage_backends = [ - (backend, config) - for backend, config in storage_backends - if backend not in (args.exclude_backends or '').split(',') - ] - return storage_backends - - -storage_backend_filters = { - 'memory': {'tags__i': {'$eq': 0}}, - 'sqlite': {'tags__i': {'$eq': 0}}, - 'annlite': {'i': {'$eq': 0}}, - 'qdrant': {'tags__i': {'$eq': 0}}, - 'weaviate': {'path': 'i', 'operator': 'Equal', 'valueInt': 0}, - 'elasticsearch': {'match': {'i': 0}}, - 'redis': {'i': {'$eq': 0}}, -} - - -def run_benchmark( - X_tr, - X_te, - dataset, - n_index_values, - n_vector_queries, - n_query, - storage_backends, - K, - D, -): - table = Table( - title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' - ) - benchmark_df = pd.DataFrame( - { - 'Storage Backend': [], - 'Indexing time (C)': [], - 'Query (R)': [], - 'Update (U)': [], - 'Delete (D)': [], - 'Find by vector': [], - f'Recall at k={K} for vector search': [], - 'Find by condition': [], - } - ) - - for col in benchmark_df.columns: - table.add_column(col) - - console = Console() - find_by_vector_values = {str(n_index): [] for n_index in n_index_values} - create_values = {str(n_index): [] for n_index in n_index_values} - - console.print(f'Reading dataset') - docs = get_docs(X_tr) - docs_to_delete = random.sample(docs, n_query) - docs_to_update = random.sample(docs, n_query) - vector_queries = [x for x in X_te] - ground_truth = [x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)]] - - for idx, n_index in enumerate(n_index_values): - for backend, config in storage_backends: - try: - console.print('\nBackend:', backend.title()) - # for n_i in n_index: - if not config: - da = DocumentArray(storage=backend) - else: - da = DocumentArray(storage=backend, config=config) - - console.print(f'\tindexing {n_index} docs ...') - create_time, _ = create(da, docs) - - # for n_q in n_query: - console.print(f'\treading {n_query} docs ...') - read_time, _ = read( - da, - random.sample([d.id for d in docs], n_query), - ) - - console.print(f'\tupdating {n_query} docs ...') - update_time, _ = update(da, docs_to_update) - - console.print(f'\tdeleting {n_query} docs ...') - delete_time, _ = delete(da, [d.id for d in docs_to_delete]) - - console.print( - f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' - ) - if backend == 'memory': - find_by_vector_time, results = find_by_vector( - da, vector_queries[0], limit=K - ) - recall_at_k = recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[0], K - ) - elif backend == 'sqlite': - find_by_vector_time, result = find_by_vector( - da, vector_queries[0], limit=K - ) - # recall_at_k = 1 - recall_at_k = recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[0], K - ) - else: - recall_at_k_values = [] - find_by_vector_times = [] - for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector( - da, query, limit=K - ) - find_by_vector_times.append(find_by_vector_time) - recall_at_k_values.append( - recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i], K - ) - ) - - recall_at_k = np.mean(recall_at_k_values) - find_by_vector_time = np.mean(find_by_vector_times) - - console.print(f'\tfinding {n_query} docs by condition ...') - find_by_condition_time, _ = find_by_condition( - da, storage_backend_filters[backend] - ) - - if idx == len(n_index_values) - 1: - table.add_row( - backend.title(), - fmt(create_time, 's'), - fmt(read_time * 1000, 'ms'), - fmt(update_time * 1000, 'ms'), - fmt(delete_time * 1000, 'ms'), - fmt(find_by_vector_time, 's'), - '{:.3f}'.format(recall_at_k), - fmt(find_by_condition_time, 's'), - ) - benchmark_df.loc[len(benchmark_df.index)] = [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - - # store find_by_vector time - find_by_vector_values[str(n_index)].append(find_by_vector_time) - create_values[str(n_index)].append(create_time) - da.clear() - del da - except Exception as e: - console.print(f'Storage Backend {backend} failed: {e}') - - console.print(table) - return find_by_vector_values, create_values, benchmark_df - - if __name__ == "__main__": # Parameters settable by the user n_query = 1 K = 10 DATASET_PATH = 'sift-128-euclidean.hdf5' - np.random.seed(123) # Variables gathered from the dataset dataset = h5py.File(DATASET_PATH, 'r') - X_tr = dataset['train'] - X_te = dataset['test'] - D = X_tr.shape[1] - n_index_values = [len(X_tr)] - n_vector_queries = len(X_te) + train = dataset['train'] + test = dataset['test'] + D = train.shape[1] + n_index = len(train) + n_vector_queries = len(test) + ground_truth = [x[0:K] for x in dataset['neighbors'][0:n_vector_queries]] # Benchmark - storage_backends = get_configuration_storage_backends(argparse) - find_by_vector_values, create_values, benchmark_df = run_benchmark( - X_tr, - X_te, - dataset, - n_index_values, + storage_backends = get_configuration_storage_backends(argparse, D) + find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark( + train, + test, + ground_truth, + n_index, n_vector_queries, n_query, storage_backends, K, D, ) + + # store find_by_vector time + find_by_vector_values = {str(n_index): find_by_vector_time_all} + create_values = {str(n_index): create_time_all} + save_benchmark_df(benchmark_df, n_index) + plot_results( find_by_vector_values, storage_backends, create_values, plot_legend=False ) - save_benchmark_df(benchmark_df) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index d94606efb08..3aa84da1e7b 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -1,21 +1,15 @@ import functools -from time import perf_counter -from typing import Iterable - import random +from time import perf_counter +import matplotlib.pyplot as plt +import numpy as np import pandas as pd - +import seaborn as sns +from docarray import Document, DocumentArray from rich.console import Console from rich.table import Table -from docarray import DocumentArray -from docarray import Document -import numpy as np -import seaborn as sns -import matplotlib.pyplot as plt -import pandas as pd - def timer(func): @functools.wraps(func) @@ -57,14 +51,13 @@ def find_by_vector(da, query, limit): return da.find(query, limit=limit) -def get_docs(X_tr): +def get_docs(train): return [ Document( embedding=x, - # tensor=np.random.rand(*tensor_shape), tags={'i': int(i)}, ) - for i, x in enumerate(X_tr) + for i, x in enumerate(train) ] @@ -214,173 +207,12 @@ def recall(predicted, relevant, eval_at): return 0.0 predicted_at_k = predicted[:eval_at] n_predicted_and_relevant = len( - set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + set(predicted_at_k[:, 'id']).intersection(set(relevant)) ) return n_predicted_and_relevant / len(relevant) -def recall_from_numpy(predicted, relevant, eval_at: int): - """ - >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],5) - 1.0 - >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],4) - 0.8 - >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],3) - 0.3 - >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],2) - 0.4 - >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],1) - 0.2 - """ - if eval_at == 0: - return 0.0 - predicted_at_k = predicted[:eval_at] - n_predicted_and_relevant = len(set(predicted_at_k).intersection(set(relevant))) - return n_predicted_and_relevant / len(relevant) - - def run_benchmark( - X_tr, - X_te, - dataset, - n_index_values, - n_vector_queries, - n_query, - storage_backends, - K, - D, -): - table = Table( - title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' - ) - benchmark_df = pd.DataFrame( - { - 'Storage Backend': [], - 'Indexing time (C)': [], - 'Query (R)': [], - 'Update (U)': [], - 'Delete (D)': [], - 'Find by vector': [], - f'Recall at k={K} for vector search': [], - 'Find by condition': [], - } - ) - - for col in benchmark_df.columns: - table.add_column(col) - - console = Console() - find_by_vector_values = {str(n_index): [] for n_index in n_index_values} - create_values = {str(n_index): [] for n_index in n_index_values} - - console.print(f'Reading dataset') - docs = get_docs(X_tr) - docs_to_delete = random.sample(docs, n_query) - docs_to_update = random.sample(docs, n_query) - vector_queries = [x for x in X_te] - ground_truth = [x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)]] - - for idx, n_index in enumerate(n_index_values): - for backend, config in storage_backends: - try: - console.print('\nBackend:', backend.title()) - # for n_i in n_index: - if not config: - da = DocumentArray(storage=backend) - else: - da = DocumentArray(storage=backend, config=config) - - console.print(f'\tindexing {n_index} docs ...') - create_time, _ = create(da, docs) - - # for n_q in n_query: - console.print(f'\treading {n_query} docs ...') - read_time, _ = read( - da, - random.sample([d.id for d in docs], n_query), - ) - - console.print(f'\tupdating {n_query} docs ...') - update_time, _ = update(da, docs_to_update) - - console.print(f'\tdeleting {n_query} docs ...') - delete_time, _ = delete(da, [d.id for d in docs_to_delete]) - - console.print( - f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' - ) - if backend == 'memory': - find_by_vector_time, results = find_by_vector( - da, vector_queries[0], limit=K - ) - recall_at_k = recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[0], K - ) - elif backend == 'sqlite': - find_by_vector_time, result = find_by_vector( - da, vector_queries[0], limit=K - ) - # recall_at_k = 1 - recall_at_k = recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[0], K - ) - else: - recall_at_k_values = [] - find_by_vector_times = [] - for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector( - da, query, limit=K - ) - find_by_vector_times.append(find_by_vector_time) - recall_at_k_values.append( - recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i], K - ) - ) - - recall_at_k = np.mean(recall_at_k_values) - find_by_vector_time = np.mean(find_by_vector_times) - - console.print(f'\tfinding {n_query} docs by condition ...') - find_by_condition_time, _ = find_by_condition( - da, storage_backend_filters[backend] - ) - - if idx == len(n_index_values) - 1: - table.add_row( - backend.title(), - fmt(create_time, 's'), - fmt(read_time * 1000, 'ms'), - fmt(update_time * 1000, 'ms'), - fmt(delete_time * 1000, 'ms'), - fmt(find_by_vector_time, 's'), - '{:.3f}'.format(recall_at_k), - fmt(find_by_condition_time, 's'), - ) - benchmark_df.loc[len(benchmark_df.index)] = [ - backend.title(), - create_time, - read_time, - update_time, - delete_time, - find_by_vector_time, - recall_at_k, - find_by_condition_time, - ] - - # store find_by_vector time - find_by_vector_values[str(n_index)].append(find_by_vector_time) - create_values[str(n_index)].append(create_time) - da.clear() - del da - except Exception as e: - console.print(f'Storage Backend {backend} failed: {e}') - - console.print(table) - return find_by_vector_values, create_values, benchmark_df - - -def run_benchmark2( train, test, ground_truth, @@ -448,7 +280,13 @@ def run_benchmark2( console.print( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) - if backend == 'sqlite': + + if backend == 'memory' and len(ground_truth) == n_vector_queries: + find_by_vector_time, results = find_by_vector( + da, vector_queries[0], limit=K + ) + recall_at_k = recall(results, ground_truth[0], K) + elif backend == 'sqlite': find_by_vector_time, result = find_by_vector( da, vector_queries[0], limit=K ) @@ -460,7 +298,7 @@ def run_benchmark2( find_by_vector_time, results = find_by_vector(da, query, limit=K) find_by_vector_times.append(find_by_vector_time) if backend == 'memory': - ground_truth.append(results) + ground_truth.append(results[:, 'tags__i']) recall_at_k_values.append(1) else: recall_at_k_values.append(recall(results, ground_truth[i], K)) From 6cf9d29cc437bef0ad70b7f11925ac4f4aafe312 Mon Sep 17 00:00:00 2001 From: AnneY Date: Tue, 18 Oct 2022 14:07:07 +0800 Subject: [PATCH 26/33] fix: fix metric --- scripts/benchmarking_utils.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index 3aa84da1e7b..a3f5af1f774 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -47,8 +47,8 @@ def find_by_condition(da, query): @timer -def find_by_vector(da, query, limit): - return da.find(query, limit=limit) +def find_by_vector(da, query, limit, **kwargs): + return da.find(query, limit=limit, **kwargs) def get_docs(train): @@ -122,6 +122,7 @@ def get_configuration_storage_backends(argparse, D): { 'n_dim': D, 'port': '41236', + 'distance': 'L2', 'columns': {'i': 'int'}, }, ), @@ -179,6 +180,7 @@ def get_configuration_storage_backends(argparse, D): 'm': 16, 'ef_runtime': 100, 'port': '41236', + 'columns': {'i': 'int'}, }, ), ] @@ -207,7 +209,7 @@ def recall(predicted, relevant, eval_at): return 0.0 predicted_at_k = predicted[:eval_at] n_predicted_and_relevant = len( - set(predicted_at_k[:, 'id']).intersection(set(relevant)) + set(predicted_at_k[:, 'tags__i']).intersection(set(relevant)) ) return n_predicted_and_relevant / len(relevant) @@ -283,12 +285,12 @@ def run_benchmark( if backend == 'memory' and len(ground_truth) == n_vector_queries: find_by_vector_time, results = find_by_vector( - da, vector_queries[0], limit=K + da=da, query=vector_queries[0], limit=K, metric='euclidean' ) recall_at_k = recall(results, ground_truth[0], K) elif backend == 'sqlite': find_by_vector_time, result = find_by_vector( - da, vector_queries[0], limit=K + da, vector_queries[0], limit=K, metric='euclidean' ) recall_at_k = recall(result, ground_truth[0], K) else: From 7b819b52a6336e80cd2daf2960084559c409add9 Mon Sep 17 00:00:00 2001 From: AnneY Date: Tue, 18 Oct 2022 14:47:12 +0800 Subject: [PATCH 27/33] feat: add hnsw params benchmark --- scripts/benchmarking_sift1m_RQ.py | 68 ++++++++++++++ scripts/benchmarking_utils.py | 142 +++++++++++++++++++++++++++++- 2 files changed, 206 insertions(+), 4 deletions(-) create mode 100644 scripts/benchmarking_sift1m_RQ.py diff --git a/scripts/benchmarking_sift1m_RQ.py b/scripts/benchmarking_sift1m_RQ.py new file mode 100644 index 00000000000..ffe8db7bd39 --- /dev/null +++ b/scripts/benchmarking_sift1m_RQ.py @@ -0,0 +1,68 @@ +import h5py + +from benchmarking_utils import ( + run_benchmark2, + save_benchmark_df, +) + +if __name__ == "__main__": + + # Parameters settable by the user + n_query = 1 + K = 10 + DATASET_PATH = 'sift-128-euclidean.hdf5' + + # Variables gathered from the dataset + dataset = h5py.File(DATASET_PATH, 'r') + train = dataset['train'][:1000] + test = dataset['test'][:100] + D = train.shape[1] + n_index = len(train) + n_vector_queries = len(test) + ground_truth = [x[0:K] for x in dataset['neighbors'][0:n_vector_queries]] + + # Benchmark + storage = 'redis' + storage_config = [ + { + 'n_dim': D, + 'port': '41236', + 'distance': 'L2', + 'columns': {'i': 'int'}, + 'm': 16, + 'ef_construction': 200, + 'ef_runtime': 10, + }, + { + 'n_dim': D, + 'port': '41236', + 'distance': 'L2', + 'columns': {'i': 'int'}, + 'm': 16, + 'ef_construction': 200, + 'ef_runtime': 20, + }, + { + 'n_dim': D, + 'port': '41236', + 'distance': 'L2', + 'columns': {'i': 'int'}, + 'm': 16, + 'ef_construction': 400, + 'ef_runtime': 10, + }, + ] + benchmark_df = run_benchmark2( + train, + test, + ground_truth, + n_index, + n_vector_queries, + n_query, + storage, + storage_config, + K, + ) + + # store benchmark time + save_benchmark_df(benchmark_df, storage) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index a3f5af1f774..ebcbc613817 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -342,13 +342,12 @@ def run_benchmark( except Exception as e: console.print(f'Storage Backend {backend} failed: {e}') - # print(find_by_vector_time_all) console.print(table) return find_by_vector_time_all, create_time_all, benchmark_df -def save_benchmark_df(benchmark_df, n_index): - benchmark_df.to_csv(f'benchmark-seconds-{n_index}.csv') +def save_benchmark_df(benchmark_df, n): + benchmark_df.to_csv(f'benchmark-seconds-{n}.csv') benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( lambda value: 1_000_000 / value @@ -367,7 +366,7 @@ def save_benchmark_df(benchmark_df, n_index): lambda value: 1 / value ) - benchmark_df.to_csv(f'benchmark-qps-{n_index}.csv') + benchmark_df.to_csv(f'benchmark-qps-{n}.csv') def plot_results( @@ -421,3 +420,138 @@ def plot_results( ax1.legend(fontsize=15) ax2.legend(fontsize=15) plt.savefig('benchmark.svg') + + +def run_benchmark2( + train, + test, + ground_truth, + n_index, + n_vector_queries, + n_query, + backend, + storage_config, + K, +): + table = Table(title=f'DocArray Sift1M Benchmarking backend={backend}') + benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'M': [], + 'EF_CONSTRUCTION': [], + 'EF_RUNTIME': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } + ) + + for col in benchmark_df.columns: + table.add_column(col) + + console = Console() + + console.print(f'Reading dataset') + docs = get_docs(train) + docs_to_delete = random.sample(docs, n_query) + docs_to_update = random.sample(docs, n_query) + vector_queries = [x for x in test] + + for config in storage_config: + try: + console.print('\nBackend:', backend.title()) + console.print('\nBackend config', str(config)) + + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + + console.print(f'\tindexing {n_index} docs ...') + create_time, _ = create(da, docs) + + # for n_q in n_query: + console.print(f'\treading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + + console.print( + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + + if backend == 'memory' and len(ground_truth) == n_vector_queries: + find_by_vector_time, results = find_by_vector( + da=da, query=vector_queries[0], limit=K, metric='euclidean' + ) + recall_at_k = recall(results, ground_truth[0], K) + elif backend == 'sqlite': + find_by_vector_time, result = find_by_vector( + da, vector_queries[0], limit=K, metric='euclidean' + ) + recall_at_k = recall(result, ground_truth[0], K) + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector(da, query, limit=K) + find_by_vector_times.append(find_by_vector_time) + if backend == 'memory': + ground_truth.append(results[:, 'tags__i']) + recall_at_k_values.append(1) + else: + recall_at_k_values.append(recall(results, ground_truth[i], K)) + + recall_at_k = np.mean(recall_at_k_values) + find_by_vector_time = np.mean(find_by_vector_times) + + console.print(f'\tfinding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition( + da, storage_backend_filters[backend] + ) + + table.add_row( + backend.title(), + str(config['m']), + str(config['ef_construction']), + str(config['ef_runtime']), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.loc[len(benchmark_df.index)] = [ + backend.title(), + config['m'], + config['ef_construction'], + config['ef_runtime'], + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + + da.clear() + del da + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + console.print(table) + return benchmark_df From 4cfc38c45d45ee5e82811b7d013004253e2ffe70 Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 19 Oct 2022 12:34:28 +0800 Subject: [PATCH 28/33] feat: add sift benchmark recall/query lines --- scripts/benchmarking.py | 9 +- scripts/benchmarking_sift1m.py | 65 +++--- scripts/benchmarking_sift1m_RQ.py | 68 ------ scripts/benchmarking_utils.py | 333 ++++++++++++++++++------------ 4 files changed, 243 insertions(+), 232 deletions(-) delete mode 100644 scripts/benchmarking_sift1m_RQ.py diff --git a/scripts/benchmarking.py b/scripts/benchmarking.py index 2da10251704..18a74554161 100644 --- a/scripts/benchmarking.py +++ b/scripts/benchmarking.py @@ -3,6 +3,7 @@ import numpy as np from benchmarking_utils import ( + get_docs, get_configuration_storage_backends, plot_results, run_benchmark, @@ -21,7 +22,7 @@ np.random.seed(123) # Benchmark - storage_backends = get_configuration_storage_backends(argparse, D) + storage_backends = get_configuration_storage_backends(argparse, D, True) find_by_vector_values = {str(n_index): [] for n_index in n_index_values} create_values = {str(n_index): [] for n_index in n_index_values} @@ -30,8 +31,11 @@ test = [np.random.rand(D) for _ in range(n_vector_queries)] ground_truth = [] + print(f'Reading dataset') + docs = get_docs(train) + find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark( - train, + docs, test, ground_truth, n_index, @@ -39,7 +43,6 @@ n_query, storage_backends, K, - D, ) # store find_by_vector time diff --git a/scripts/benchmarking_sift1m.py b/scripts/benchmarking_sift1m.py index 00c768cef21..a65f28897e3 100644 --- a/scripts/benchmarking_sift1m.py +++ b/scripts/benchmarking_sift1m.py @@ -1,11 +1,13 @@ import argparse +from itertools import product import h5py from benchmarking_utils import ( get_configuration_storage_backends, - plot_results, - run_benchmark, + get_docs, + plot_results_sift, + run_benchmark_sift, save_benchmark_df, ) @@ -18,32 +20,43 @@ # Variables gathered from the dataset dataset = h5py.File(DATASET_PATH, 'r') - train = dataset['train'] - test = dataset['test'] + train = dataset['train'][:1000] + test = dataset['test'][:10] D = train.shape[1] n_index = len(train) n_vector_queries = len(test) ground_truth = [x[0:K] for x in dataset['neighbors'][0:n_vector_queries]] - # Benchmark - storage_backends = get_configuration_storage_backends(argparse, D) - find_by_vector_time_all, create_time_all, benchmark_df = run_benchmark( - train, - test, - ground_truth, - n_index, - n_vector_queries, - n_query, - storage_backends, - K, - D, - ) - - # store find_by_vector time - find_by_vector_values = {str(n_index): find_by_vector_time_all} - create_values = {str(n_index): create_time_all} - save_benchmark_df(benchmark_df, n_index) - - plot_results( - find_by_vector_values, storage_backends, create_values, plot_legend=False - ) + print(f'Reading dataset') + docs = get_docs(train) + + BENCHMARK_CONFIG = get_configuration_storage_backends(argparse, D, False) + + for storage, cfg in BENCHMARK_CONFIG.items(): + + storage_config = cfg['storage_config'] + hnsw_config = [] + + if storage != 'memory' and storage != 'sqlite': + for hnsw_cfg in product(*cfg['hnsw_config'].values()): + hnsw_config.append(dict(zip(cfg['hnsw_config'].keys(), hnsw_cfg))) + else: + hnsw_config.append({}) + + benchmark_df = run_benchmark_sift( + test, + docs, + ground_truth, + n_index, + n_vector_queries, + n_query, + storage, + storage_config, + hnsw_config, + K, + ) + + # store benchmark time + save_benchmark_df(benchmark_df, storage) + + plot_results_sift(BENCHMARK_CONFIG.keys()) diff --git a/scripts/benchmarking_sift1m_RQ.py b/scripts/benchmarking_sift1m_RQ.py deleted file mode 100644 index ffe8db7bd39..00000000000 --- a/scripts/benchmarking_sift1m_RQ.py +++ /dev/null @@ -1,68 +0,0 @@ -import h5py - -from benchmarking_utils import ( - run_benchmark2, - save_benchmark_df, -) - -if __name__ == "__main__": - - # Parameters settable by the user - n_query = 1 - K = 10 - DATASET_PATH = 'sift-128-euclidean.hdf5' - - # Variables gathered from the dataset - dataset = h5py.File(DATASET_PATH, 'r') - train = dataset['train'][:1000] - test = dataset['test'][:100] - D = train.shape[1] - n_index = len(train) - n_vector_queries = len(test) - ground_truth = [x[0:K] for x in dataset['neighbors'][0:n_vector_queries]] - - # Benchmark - storage = 'redis' - storage_config = [ - { - 'n_dim': D, - 'port': '41236', - 'distance': 'L2', - 'columns': {'i': 'int'}, - 'm': 16, - 'ef_construction': 200, - 'ef_runtime': 10, - }, - { - 'n_dim': D, - 'port': '41236', - 'distance': 'L2', - 'columns': {'i': 'int'}, - 'm': 16, - 'ef_construction': 200, - 'ef_runtime': 20, - }, - { - 'n_dim': D, - 'port': '41236', - 'distance': 'L2', - 'columns': {'i': 'int'}, - 'm': 16, - 'ef_construction': 400, - 'ef_runtime': 10, - }, - ] - benchmark_df = run_benchmark2( - train, - test, - ground_truth, - n_index, - n_vector_queries, - n_query, - storage, - storage_config, - K, - ) - - # store benchmark time - save_benchmark_df(benchmark_df, storage) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index ebcbc613817..cffd6df4800 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -65,7 +65,7 @@ def fmt(value, unit): return '{:.3f} {}'.format(value, unit) -def get_configuration_storage_backends(argparse, D): +def get_configuration_storage_backends(argparse, D, random=True): parser = argparse.ArgumentParser() parser.add_argument( '--default-hnsw', @@ -81,115 +81,128 @@ def get_configuration_storage_backends(argparse, D): args = parser.parse_args() - if args.default_hnsw: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', - { - 'n_dim': D, - 'scroll_batch_size': 8, - 'port': '41233', - }, - ), - ( - 'weaviate', - { - 'n_dim': D, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', + storage_backends = { + 'memory': { + 'storage_config': None, + }, + 'sqlite': { + 'storage_config': None, + }, + 'annlite': { + 'storage_config': { + 'n_dim': D, + 'columns': {'i': 'int'}, + }, + }, + 'qdrant': { + 'storage_config': { + 'n_dim': D, + 'port': '41233', + 'scroll_batch_size': 8, + }, + }, + 'weaviate': { + 'storage_config': { + 'n_dim': D, + 'port': '41234', + 'columns': {'i': 'int'}, + }, + }, + 'elasticsearch': { + 'storage_config': { + 'n_dim': D, + 'hosts': 'http://localhost:41235', + 'columns': {'i': 'int'}, + 'es_config': {'timeout': 1000}, + }, + }, + 'redis': { + 'storage_config': { + 'n_dim': D, + 'port': '41236', + 'columns': {'i': 'int'}, + }, + }, + } + + if random: + if not args.default_hnsw: + storage_backends['annlite']['storage_config'].update( { - 'n_dim': D, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - 'es_config': {'timeout': 1000}, - }, - ), - ( - 'redis', - { - 'n_dim': D, - 'port': '41236', - 'distance': 'L2', - 'columns': {'i': 'int'}, - }, - ), - ] - else: - storage_backends = [ - ('memory', None), - ('sqlite', None), - ( - 'annlite', - { - 'n_dim': D, 'ef_construction': 100, 'ef_search': 100, 'max_connection': 16, - 'columns': {'i': 'int'}, - }, - ), - ( - 'qdrant', + } + ) + storage_backends['qdrant']['storage_config'].update( { - 'n_dim': D, - 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16, - 'port': '41233', - }, - ), - ( - 'weaviate', + } + ) + storage_backends['weaviate']['storage_config'].update( { - 'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16, - 'port': '41234', - 'columns': {'i': 'int'}, - }, - ), - ( - 'elasticsearch', + } + ) + storage_backends['elasticsearch']['storage_config'].update( { - 'n_dim': D, 'ef_construction': 100, 'm': 16, - 'hosts': 'http://localhost:41235', - 'columns': {'i': 'int'}, - }, - ), - ( - 'redis', + } + ) + storage_backends['redis']['storage_config'].update( { - 'n_dim': D, 'ef_construction': 100, 'm': 16, 'ef_runtime': 100, - 'port': '41236', - 'columns': {'i': 'int'}, - }, - ), + } + ) + + storage_backends = [ + (storage, configs['storage_config']) + for storage, configs in storage_backends.items() + if storage not in (args.exclude_backends or '').split(',') ] + else: + storage_backends['annlite']['storage_config']['metric'] = 'euclidean' + storage_backends['annlite']['hnsw_config'] = { + 'max_connection': [16, 32], + 'ef_construction': [128, 256], + 'ef_search': [16, 32], + } + + storage_backends['qdrant']['storage_config']['distance'] = 'euclidean' + storage_backends['qdrant']['hnsw_config'] = { + 'm': [16, 32], + 'ef_construct': [128, 256], + } + + storage_backends['weaviate']['storage_config']['distance'] = 'l2-squared' + storage_backends['weaviate']['hnsw_config'] = { + 'max_connections': [16, 32], + 'ef_construction': [128, 256], + 'ef': [16, 32], + } + + storage_backends['elasticsearch']['storage_config']['distance'] = 'l2_norm' + storage_backends['elasticsearch']['hnsw_config'] = { + 'm': [16, 32], + 'ef_construction': [128, 256], + } + + storage_backends['redis']['storage_config']['distance'] = 'L2' + storage_backends['redis']['hnsw_config'] = { + 'm': [16, 32], + 'ef_construction': [128, 256], + 'ef_runtime': [16, 32], + } + + for storage in (args.exclude_backends or '').split(','): + storage_backends.pop(storage) - storage_backends = [ - (backend, config) - for backend, config in storage_backends - if backend not in (args.exclude_backends or '').split(',') - ] return storage_backends @@ -215,7 +228,7 @@ def recall(predicted, relevant, eval_at): def run_benchmark( - train, + docs, test, ground_truth, n_index, @@ -223,10 +236,9 @@ def run_benchmark( n_query, storage_backends, K, - D, ): table = Table( - title=f'DocArray Benchmarking n_index={n_index} n_query={n_query} D={D} K={K}' + title=f'DocArray Random Benchmarking n_index={n_index} n_query={n_query} D={test[0].shape[0]} K={K}' ) benchmark_df = pd.DataFrame( { @@ -246,8 +258,6 @@ def run_benchmark( console = Console() - console.print(f'Reading dataset') - docs = get_docs(train) docs_to_delete = random.sample(docs, n_query) docs_to_update = random.sample(docs, n_query) vector_queries = [x for x in test] @@ -258,6 +268,8 @@ def run_benchmark( for backend, config in storage_backends: try: console.print('\nBackend:', backend.title()) + console.print('Backend config', str(config)) + if not config: da = DocumentArray(storage=backend) else: @@ -283,14 +295,9 @@ def run_benchmark( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) - if backend == 'memory' and len(ground_truth) == n_vector_queries: - find_by_vector_time, results = find_by_vector( - da=da, query=vector_queries[0], limit=K, metric='euclidean' - ) - recall_at_k = recall(results, ground_truth[0], K) - elif backend == 'sqlite': + if backend == 'sqlite': find_by_vector_time, result = find_by_vector( - da, vector_queries[0], limit=K, metric='euclidean' + da, vector_queries[0], limit=K ) recall_at_k = recall(result, ground_truth[0], K) else: @@ -422,18 +429,19 @@ def plot_results( plt.savefig('benchmark.svg') -def run_benchmark2( - train, +def run_benchmark_sift( test, + docs, ground_truth, n_index, n_vector_queries, n_query, - backend, + storage, storage_config, + hnsw_config, K, ): - table = Table(title=f'DocArray Sift1M Benchmarking backend={backend}') + table = Table(title=f'DocArray Sift1M Benchmarking storage={storage}') benchmark_df = pd.DataFrame( { 'Storage Backend': [], @@ -455,21 +463,20 @@ def run_benchmark2( console = Console() - console.print(f'Reading dataset') - docs = get_docs(train) docs_to_delete = random.sample(docs, n_query) docs_to_update = random.sample(docs, n_query) vector_queries = [x for x in test] - for config in storage_config: + for config in hnsw_config: try: - console.print('\nBackend:', backend.title()) - console.print('\nBackend config', str(config)) + console.print('\nBackend:', storage.title()) + console.print('Backend hnsw config', str(config)) - if not config: - da = DocumentArray(storage=backend) + if not storage_config: + da = DocumentArray(storage=storage) else: - da = DocumentArray(storage=backend, config=config) + config.update(storage_config) + da = DocumentArray(storage=storage, config=config) console.print(f'\tindexing {n_index} docs ...') create_time, _ = create(da, docs) @@ -491,12 +498,7 @@ def run_benchmark2( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) - if backend == 'memory' and len(ground_truth) == n_vector_queries: - find_by_vector_time, results = find_by_vector( - da=da, query=vector_queries[0], limit=K, metric='euclidean' - ) - recall_at_k = recall(results, ground_truth[0], K) - elif backend == 'sqlite': + if storage == 'memory' or storage == 'sqlite': find_by_vector_time, result = find_by_vector( da, vector_queries[0], limit=K, metric='euclidean' ) @@ -507,25 +509,21 @@ def run_benchmark2( for i, query in enumerate(vector_queries): find_by_vector_time, results = find_by_vector(da, query, limit=K) find_by_vector_times.append(find_by_vector_time) - if backend == 'memory': - ground_truth.append(results[:, 'tags__i']) - recall_at_k_values.append(1) - else: - recall_at_k_values.append(recall(results, ground_truth[i], K)) + recall_at_k_values.append(recall(results, ground_truth[i], K)) recall_at_k = np.mean(recall_at_k_values) find_by_vector_time = np.mean(find_by_vector_times) console.print(f'\tfinding {n_query} docs by condition ...') find_by_condition_time, _ = find_by_condition( - da, storage_backend_filters[backend] + da, storage_backend_filters[storage] ) table.add_row( - backend.title(), - str(config['m']), - str(config['ef_construction']), - str(config['ef_runtime']), + storage.title(), + str(config.get(get_param(storage, 'M'), None)), + str(config.get(get_param(storage, 'EF_CONSTRUCTION'), None)), + str(config.get(get_param(storage, 'EF_RUNTIME'), None)), fmt(create_time, 's'), fmt(read_time * 1000, 'ms'), fmt(update_time * 1000, 'ms'), @@ -535,10 +533,10 @@ def run_benchmark2( fmt(find_by_condition_time, 's'), ) benchmark_df.loc[len(benchmark_df.index)] = [ - backend.title(), - config['m'], - config['ef_construction'], - config['ef_runtime'], + storage.title(), + config.get(get_param(storage, 'M'), None), + config.get(get_param(storage, 'EF_CONSTRUCTION'), None), + config.get(get_param(storage, 'EF_RUNTIME'), None), create_time, read_time, update_time, @@ -548,10 +546,75 @@ def run_benchmark2( find_by_condition_time, ] + console.print(table) da.clear() del da except Exception as e: - console.print(f'Storage Backend {backend} failed: {e}') + console.print(f'Storage Backend {storage} failed: {e}') console.print(table) return benchmark_df + + +param_dict = { + 'annlite': { + 'M': 'max_connection', + 'EF_CONSTRUCTION': 'ef_construction', + 'EF_RUNTIME': 'ef_search', + }, + 'qdrant': {'M': 'm', 'EF_CONSTRUCTION': 'ef_construct'}, + 'weaviate': { + 'M': 'max_connections', + 'EF_CONSTRUCTION': 'ef_construction', + 'EF_RUNTIME': 'ef', + }, + 'elasticsearch': {'M': 'm', 'EF_CONSTRUCTION': 'ef_construction'}, + 'redis': { + 'M': 'm', + 'EF_CONSTRUCTION': 'ef_construction', + 'EF_RUNTIME': 'ef_runtime', + }, +} + + +def get_param(storage, param): + if storage == 'memory' or storage == 'sqlite': + return param + + return param_dict[storage].get(param, param) + + +def plot_results_sift(storages): + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + + for storage in storages: + df = pd.read_csv(f'benchmark-qps-{storage}.csv') + df.rename(columns={'Recall at k=10 for vector search': 'Recall'}, inplace=True) + df.sort_values(by=['Recall'], inplace=True) + df.plot( + style='.-', + ax=ax1, + x='Recall', + y='Query (R)', + ylabel='Queries per second (1/s)', + label=storage, + ) + + ax1.set_title('Recall/Queries per second (1/s)', fontsize=18) + + for storage in storages: + df = pd.read_csv(f'benchmark-seconds-{storage}.csv') + df.rename(columns={'Recall at k=10 for vector search': 'Recall'}, inplace=True) + df.sort_values(by=['Recall'], inplace=True) + df.plot( + style='.-', + ax=ax2, + x='Recall', + y='Indexing time (C)', + ylabel='Indexing time (s)', + label=storage, + ) + + ax2.set_title('Recall/Indexing time (s)', fontsize=18) + + plt.savefig('benchmark.png') From e4c8b1d967858e8fb81adb14e475edaa4266077a Mon Sep 17 00:00:00 2001 From: AnneY Date: Wed, 19 Oct 2022 21:20:42 +0800 Subject: [PATCH 29/33] fix: remove test subset --- scripts/benchmarking_sift1m.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/benchmarking_sift1m.py b/scripts/benchmarking_sift1m.py index a65f28897e3..f389362de84 100644 --- a/scripts/benchmarking_sift1m.py +++ b/scripts/benchmarking_sift1m.py @@ -20,8 +20,8 @@ # Variables gathered from the dataset dataset = h5py.File(DATASET_PATH, 'r') - train = dataset['train'][:1000] - test = dataset['test'][:10] + train = dataset['train'] + test = dataset['test'] D = train.shape[1] n_index = len(train) n_vector_queries = len(test) From 74499dd251f43846d61a2af68467e33bcab265d4 Mon Sep 17 00:00:00 2001 From: AnneY Date: Fri, 21 Oct 2022 10:21:20 +0800 Subject: [PATCH 30/33] refactor: change hnsw params --- scripts/benchmarking_utils.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index cffd6df4800..f50cae6482a 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -77,6 +77,7 @@ def get_configuration_storage_backends(argparse, D, random=True): '--exclude-backends', help='list of comma separated backends to exclude from the benchmarks', type=str, + default='', ) args = parser.parse_args() @@ -176,8 +177,8 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['qdrant']['storage_config']['distance'] = 'euclidean' storage_backends['qdrant']['hnsw_config'] = { - 'm': [16, 32], - 'ef_construct': [128, 256], + 'm': [8, 12, 16, 32], + 'ef_construct': [32, 64, 128], } storage_backends['weaviate']['storage_config']['distance'] = 'l2-squared' @@ -189,8 +190,8 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['elasticsearch']['storage_config']['distance'] = 'l2_norm' storage_backends['elasticsearch']['hnsw_config'] = { - 'm': [16, 32], - 'ef_construction': [128, 256], + 'm': [8, 12, 16], + 'ef_construction': [16, 32, 64, 128], } storage_backends['redis']['storage_config']['distance'] = 'L2' @@ -200,8 +201,8 @@ def get_configuration_storage_backends(argparse, D, random=True): 'ef_runtime': [16, 32], } - for storage in (args.exclude_backends or '').split(','): - storage_backends.pop(storage) + for storage in args.exclude_backends.split(','): + storage_backends.pop(storage, None) return storage_backends @@ -586,6 +587,8 @@ def get_param(storage, param): def plot_results_sift(storages): fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + storages = list(storages) + storages.remove('memory') for storage in storages: df = pd.read_csv(f'benchmark-qps-{storage}.csv') From 9583a5d979a5f361fabfd3e47b8d4f0d69caa94e Mon Sep 17 00:00:00 2001 From: AnneY Date: Fri, 21 Oct 2022 20:12:51 +0800 Subject: [PATCH 31/33] fix: update and delete should be after find --- scripts/benchmarking_utils.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index f50cae6482a..485503599a2 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -489,12 +489,6 @@ def run_benchmark_sift( random.sample([d.id for d in docs], n_query), ) - console.print(f'\tupdating {n_query} docs ...') - update_time, _ = update(da, docs_to_update) - - console.print(f'\tdeleting {n_query} docs ...') - delete_time, _ = delete(da, [d.id for d in docs_to_delete]) - console.print( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) @@ -520,6 +514,12 @@ def run_benchmark_sift( da, storage_backend_filters[storage] ) + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + table.add_row( storage.title(), str(config.get(get_param(storage, 'M'), None)), From f31731c8deaec3a02d5a3eba6b782f5e943f7c40 Mon Sep 17 00:00:00 2001 From: AnneY Date: Mon, 24 Oct 2022 13:55:47 +0800 Subject: [PATCH 32/33] fix: update params, save tmp results, fix plot --- scripts/benchmarking_utils.py | 41 ++++++++++++++--------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index 485503599a2..1bd6255051e 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -172,12 +172,12 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['annlite']['hnsw_config'] = { 'max_connection': [16, 32], 'ef_construction': [128, 256], - 'ef_search': [16, 32], + 'ef_search': [64, 128, 256], } storage_backends['qdrant']['storage_config']['distance'] = 'euclidean' storage_backends['qdrant']['hnsw_config'] = { - 'm': [8, 12, 16, 32], + 'm': [12, 16, 32], 'ef_construct': [32, 64, 128], } @@ -185,7 +185,7 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['weaviate']['hnsw_config'] = { 'max_connections': [16, 32], 'ef_construction': [128, 256], - 'ef': [16, 32], + 'ef': [64, 128, 256], } storage_backends['elasticsearch']['storage_config']['distance'] = 'l2_norm' @@ -198,7 +198,7 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['redis']['hnsw_config'] = { 'm': [16, 32], 'ef_construction': [128, 256], - 'ef_runtime': [16, 32], + 'ef_runtime': [64, 128, 256], } for storage in args.exclude_backends.split(','): @@ -529,7 +529,7 @@ def run_benchmark_sift( fmt(read_time * 1000, 'ms'), fmt(update_time * 1000, 'ms'), fmt(delete_time * 1000, 'ms'), - fmt(find_by_vector_time, 's'), + fmt(find_by_vector_time * 1000, 'ms'), '{:.3f}'.format(recall_at_k), fmt(find_by_condition_time, 's'), ) @@ -547,7 +547,10 @@ def run_benchmark_sift( find_by_condition_time, ] + # print and store benchmark time console.print(table) + benchmark_df.to_csv(f'benchmark-seconds-{storage}.csv') + da.clear() del da except Exception as e: @@ -586,9 +589,12 @@ def get_param(storage, param): def plot_results_sift(storages): - fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + fig, ax = plt.subplots() storages = list(storages) - storages.remove('memory') + if 'memory' in storages: + storages.remove('memory') + if 'sqlite' in storages: + storages.remove('sqlite') for storage in storages: df = pd.read_csv(f'benchmark-qps-{storage}.csv') @@ -596,28 +602,13 @@ def plot_results_sift(storages): df.sort_values(by=['Recall'], inplace=True) df.plot( style='.-', - ax=ax1, + ax=ax, x='Recall', - y='Query (R)', + y='Find by vector', ylabel='Queries per second (1/s)', label=storage, ) - ax1.set_title('Recall/Queries per second (1/s)', fontsize=18) - - for storage in storages: - df = pd.read_csv(f'benchmark-seconds-{storage}.csv') - df.rename(columns={'Recall at k=10 for vector search': 'Recall'}, inplace=True) - df.sort_values(by=['Recall'], inplace=True) - df.plot( - style='.-', - ax=ax2, - x='Recall', - y='Indexing time (C)', - ylabel='Indexing time (s)', - label=storage, - ) - - ax2.set_title('Recall/Indexing time (s)', fontsize=18) + ax.set_title('Recall/Queries per second (1/s)', fontsize=18) plt.savefig('benchmark.png') From 1b44745c234d80422e0f13a4580f77f4e8972329 Mon Sep 17 00:00:00 2001 From: AnneY Date: Mon, 24 Oct 2022 14:42:23 +0800 Subject: [PATCH 33/33] fix: elasticsearch add num_candidates --- scripts/benchmarking_utils.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index 1bd6255051e..c8daf5e0f10 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -190,8 +190,9 @@ def get_configuration_storage_backends(argparse, D, random=True): storage_backends['elasticsearch']['storage_config']['distance'] = 'l2_norm' storage_backends['elasticsearch']['hnsw_config'] = { - 'm': [8, 12, 16], - 'ef_construction': [16, 32, 64, 128], + 'm': [16, 32], + 'ef_construction': [128, 256], + 'num_candidates': [64, 128, 256], } storage_backends['redis']['storage_config']['distance'] = 'L2' @@ -477,7 +478,11 @@ def run_benchmark_sift( da = DocumentArray(storage=storage) else: config.update(storage_config) + if storage == 'elasticsearch': + num_candidates = config.pop('num_candidates') da = DocumentArray(storage=storage, config=config) + if storage == 'elasticsearch': + config['num_candidates'] = num_candidates console.print(f'\tindexing {n_index} docs ...') create_time, _ = create(da, docs) @@ -502,7 +507,14 @@ def run_benchmark_sift( recall_at_k_values = [] find_by_vector_times = [] for i, query in enumerate(vector_queries): - find_by_vector_time, results = find_by_vector(da, query, limit=K) + if storage == 'elasticsearch': + find_by_vector_time, results = find_by_vector( + da, query, limit=K, num_candidates=config['num_candidates'] + ) + else: + find_by_vector_time, results = find_by_vector( + da, query, limit=K + ) find_by_vector_times.append(find_by_vector_time) recall_at_k_values.append(recall(results, ground_truth[i], K)) @@ -572,7 +584,11 @@ def run_benchmark_sift( 'EF_CONSTRUCTION': 'ef_construction', 'EF_RUNTIME': 'ef', }, - 'elasticsearch': {'M': 'm', 'EF_CONSTRUCTION': 'ef_construction'}, + 'elasticsearch': { + 'M': 'm', + 'EF_CONSTRUCTION': 'ef_construction', + 'EF_RUNTIME': 'num_candidates', + }, 'redis': { 'M': 'm', 'EF_CONSTRUCTION': 'ef_construction',