From e8e6fbc85521185c8d7ea67de350424e944c798b Mon Sep 17 00:00:00 2001 From: David Buchaca Date: Mon, 25 Apr 2022 11:26:09 +0200 Subject: [PATCH 1/9] feat: add benchmark adapted for sift1m --- scripts/benchmarking_sift_1M.py | 308 ++++++++++++++++++++++++++++++++ 1 file changed, 308 insertions(+) create mode 100644 scripts/benchmarking_sift_1M.py diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py new file mode 100644 index 00000000000..fedd3621e93 --- /dev/null +++ b/scripts/benchmarking_sift_1M.py @@ -0,0 +1,308 @@ +import argparse +import functools +import random +from time import perf_counter + +import numpy as np +import pandas as pd +import seaborn as sns +import matplotlib.pyplot as plt + +from docarray import Document, DocumentArray +from rich.console import Console +from rich.table import Table +import h5py +import os + +n_query = 1 +D = 128 +TENSOR_SHAPE = (512, 256) +K = 10 +n_vector_queries = 1000 +np.random.seed(123) +DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') +dataset = h5py.File(DATASET_PATH, 'r') +n_index_values = [len(X_tr)] + +X_tr = dataset['train'][0:] +X_te = dataset['test'][0:] + +parser = argparse.ArgumentParser() +parser.add_argument( + '--default-hnsw', + help='Whether to use default HNSW configurations', + action='store_true', +) +args = parser.parse_args() + +times = {} + + +def timer(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter() + res = func(*args, **kwargs) + return (perf_counter() - start, res) + + return wrapper + + +@timer +def create(da, docs): + da.extend(docs) + + +@timer +def read(da, ids): + da[ids] + + +@timer +def update(da, docs): + da[[d.id for d in docs]] = docs + + +@timer +def delete(da, ids): + del da[ids] + + +@timer +def find_by_condition(da, query): + da.find(query) + + +@timer +def find_by_vector(da, query): + return da.find(query, limit=K) + + +def get_docs(X_tr): + return [ + Document( + embedding=x, + # tensor=np.random.rand(*tensor_shape), + tags={'i': int(i)}, + ) + for i, x in enumerate(X_tr) + ] + + +def fmt(value, unit): + return '{:.3f} {}'.format(value, unit) + + +def recall(predicted, relevant, eval_at): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len( + set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + ) + return n_predicted_and_relevant / len(relevant) + + +if args.default_hnsw: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + {'n_dim': D}, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), + ('weaviate', {'n_dim': D}), + ('elasticsearch', {'n_dim': D}), + ] +else: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + }, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), + ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ] + +table = Table( + title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' +) +benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } +) + +for col in benchmark_df.columns: + table.add_column(col) + +console = Console() +find_by_vector_values = {str(n_index): [] for n_index in n_index_values} +create_values = {str(n_index): [] for n_index in n_index_values} + +console.print(f'Reading dataset') +docs = get_docs(X_tr) +docs_to_delete = random.sample(docs, n_query) +docs_to_update = random.sample(docs, n_query) +vector_queries = [x for x in X_te] +ground_truth = [] + +for idx, n_index in enumerate(n_index_values): + for backend, config in storage_backends: + try: + console.print('Backend:', backend.title()) + # for n_i in n_index: + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + console.print(f'indexing {n_index} docs ...') + create_time, _ = create(da, docs) + # for n_q in n_query: + console.print(f'reading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + console.print(f'updating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + console.print(f'deleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + console.print( + f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'sqlite': + find_by_vector_time, result = find_by_vector(da, vector_queries[0]) + recall_at_k = recall(result, ground_truth[0], K) + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector(da, query) + find_by_vector_times.append(find_by_vector_time) + if backend == 'memory': + ground_truth.append(results) + recall_at_k_values.append(1) + else: + recall_at_k_values.append(recall(results, ground_truth[i], K)) + recall_at_k = sum(recall_at_k_values) / len(recall_at_k_values) + find_by_vector_time = sum(find_by_vector_times) / len( + find_by_vector_times + ) + console.print(f'finding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) + if idx == len(n_index_values) - 1: + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.append( + pd.DataFrame( + [ + [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + ], + columns=benchmark_df.columns, + ) + ) + find_by_vector_values[str(n_index)].append(find_by_vector_time) + create_values[str(n_index)].append(create_time) + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + +find_df = pd.DataFrame(find_by_vector_values) +find_df.index = [backend for backend, _ in storage_backends] +find_df = find_df.drop(['sqlite']) +print(find_df) +fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + +find_df.plot( + kind="bar", + ax=ax1, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Find by vector per backend and dataset size', + # ylabel='seconds', + rot=0, +) +ax1.set_ylabel('seconds', fontsize=18) +ax1.set_title('Find by vector per backend and dataset size', fontsize=18) + +threshold = 0.3 +ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') + +create_df = pd.DataFrame(create_values) +create_df.index = [backend for backend, _ in storage_backends] + +create_df = create_df.drop(['memory']) +print(create_df) +create_df.plot( + kind="bar", + ax=ax2, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Indexing per backend and dataset size', + # ylabel='seconds', + rot=0, +) + +ax2.set_ylabel('seconds', fontsize=18) +ax2.set_title('Indexing per backend and dataset size', fontsize=18) + +plt.tight_layout() +ax1.legend(fontsize=15) +ax2.legend(fontsize=15) + +plt.savefig('benchmark.svg') +console.print(table) + +benchmark_df.to_csv('benchmark-seconds.csv') + +benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( + lambda value: 1_000_000 / value +) +benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) +benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply(lambda value: 1 / value) +benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply(lambda value: 1 / value) +benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( + lambda value: 1 / value +) +benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( + lambda value: 1 / value +) + +benchmark_df.to_csv('benchmark-qps.csv') From 42ca6a0e42aaad9a2960d9a28dd16e33c07a931a Mon Sep 17 00:00:00 2001 From: davidbp Date: Mon, 25 Apr 2022 12:11:56 +0200 Subject: [PATCH 2/9] fix: order --- scripts/benchmarking_sift_1M.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index fedd3621e93..d11cd285d30 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -22,10 +22,10 @@ np.random.seed(123) DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') dataset = h5py.File(DATASET_PATH, 'r') -n_index_values = [len(X_tr)] -X_tr = dataset['train'][0:] -X_te = dataset['test'][0:] +X_tr = dataset['train'][0:1000] +X_te = dataset['test'][0:10] +n_index_values = [len(X_tr)] parser = argparse.ArgumentParser() parser.add_argument( From 0bbc88c63984ef48ead7a8fbd1c7ee402ec2fbc5 Mon Sep 17 00:00:00 2001 From: davidbp Date: Mon, 25 Apr 2022 13:07:51 +0200 Subject: [PATCH 3/9] fix: drop sqlite in table --- scripts/benchmarking_sift_1M.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index d11cd285d30..4dbab944477 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -18,14 +18,14 @@ D = 128 TENSOR_SHAPE = (512, 256) K = 10 -n_vector_queries = 1000 np.random.seed(123) DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') dataset = h5py.File(DATASET_PATH, 'r') -X_tr = dataset['train'][0:1000] -X_te = dataset['test'][0:10] +X_tr = dataset['train'] +X_te = dataset['test'] n_index_values = [len(X_tr)] +n_vector_queries = len(X_te) parser = argparse.ArgumentParser() parser.add_argument( @@ -169,26 +169,26 @@ def recall(predicted, relevant, eval_at): for idx, n_index in enumerate(n_index_values): for backend, config in storage_backends: try: - console.print('Backend:', backend.title()) + console.print('\nBackend:', backend.title()) # for n_i in n_index: if not config: da = DocumentArray(storage=backend) else: da = DocumentArray(storage=backend, config=config) - console.print(f'indexing {n_index} docs ...') + console.print(f'\tindexing {n_index} docs ...') create_time, _ = create(da, docs) # for n_q in n_query: - console.print(f'reading {n_query} docs ...') + console.print(f'\treading {n_query} docs ...') read_time, _ = read( da, random.sample([d.id for d in docs], n_query), ) - console.print(f'updating {n_query} docs ...') + console.print(f'\tupdating {n_query} docs ...') update_time, _ = update(da, docs_to_update) - console.print(f'deleting {n_query} docs ...') + console.print(f'\tdeleting {n_query} docs ...') delete_time, _ = delete(da, [d.id for d in docs_to_delete]) console.print( - f'finding {n_query} docs by vector averaged {n_vector_queries} times ...' + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'sqlite': find_by_vector_time, result = find_by_vector(da, vector_queries[0]) @@ -208,7 +208,7 @@ def recall(predicted, relevant, eval_at): find_by_vector_time = sum(find_by_vector_times) / len( find_by_vector_times ) - console.print(f'finding {n_query} docs by condition ...') + console.print(f'\tfinding {n_query} docs by condition ...') find_by_condition_time, _ = find_by_condition(da, {'tags__i': {'$eq': 0}}) if idx == len(n_index_values) - 1: table.add_row( @@ -240,13 +240,13 @@ def recall(predicted, relevant, eval_at): ) find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) + console.print(table) except Exception as e: console.print(f'Storage Backend {backend} failed: {e}') - find_df = pd.DataFrame(find_by_vector_values) +storage_backends.remove(('sqlite', None)) find_df.index = [backend for backend, _ in storage_backends] -find_df = find_df.drop(['sqlite']) print(find_df) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) @@ -260,7 +260,7 @@ def recall(predicted, relevant, eval_at): rot=0, ) ax1.set_ylabel('seconds', fontsize=18) -ax1.set_title('Find by vector per backend and dataset size', fontsize=18) +ax1.set_title('Find by vector per backend', fontsize=18) threshold = 0.3 ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') @@ -281,11 +281,11 @@ def recall(predicted, relevant, eval_at): ) ax2.set_ylabel('seconds', fontsize=18) -ax2.set_title('Indexing per backend and dataset size', fontsize=18) +ax2.set_title('Indexing per backend', fontsize=18) plt.tight_layout() -ax1.legend(fontsize=15) -ax2.legend(fontsize=15) +#ax1.legend(fontsize=15) +#ax2.legend(fontsize=15) plt.savefig('benchmark.svg') console.print(table) From a6584d43ec4fc626e45cbc286782dd82e79500b0 Mon Sep 17 00:00:00 2001 From: davidbp Date: Tue, 26 Apr 2022 07:55:40 +0200 Subject: [PATCH 4/9] fix: delete docarray for once finished --- scripts/benchmarking_sift_1M.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index 4dbab944477..c9da1174a37 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -241,6 +241,8 @@ def recall(predicted, relevant, eval_at): find_by_vector_values[str(n_index)].append(find_by_vector_time) create_values[str(n_index)].append(create_time) console.print(table) + da.clear() + del da except Exception as e: console.print(f'Storage Backend {backend} failed: {e}') From c543f9526226881c1d2844c743cdb67cc260786f Mon Sep 17 00:00:00 2001 From: jina Date: Tue, 26 Apr 2022 09:56:53 +0200 Subject: [PATCH 5/9] fix: correctly drop sqlite --- scripts/benchmarking_sift_1M.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index c9da1174a37..7aa07519173 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -1,3 +1,4 @@ + import argparse import functools import random @@ -19,11 +20,12 @@ TENSOR_SHAPE = (512, 256) K = 10 np.random.seed(123) -DATASET_PATH = os.path.join(os.path.expanduser('~'), 'Desktop/ANN_SIFT1M/sift-128-euclidean.hdf5') +DATASET_PATH = './sift-128-euclidean.hdf5' dataset = h5py.File(DATASET_PATH, 'r') X_tr = dataset['train'] X_te = dataset['test'] + n_index_values = [len(X_tr)] n_vector_queries = len(X_te) @@ -247,9 +249,9 @@ def recall(predicted, relevant, eval_at): console.print(f'Storage Backend {backend} failed: {e}') find_df = pd.DataFrame(find_by_vector_values) -storage_backends.remove(('sqlite', None)) find_df.index = [backend for backend, _ in storage_backends] print(find_df) +find_df = find_df.drop(['sqlite']) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) find_df.plot( From ed511c90bdac93cd589f4559cc4793370ed4813b Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 13:11:11 +0200 Subject: [PATCH 6/9] feat: speedup benchmark from groundtruth --- scripts/benchmarking_dataset.py | 232 ++++++++++++++++++++++++++++++++ scripts/benchmarking_utils.py | 157 +++++++++++++++++++++ 2 files changed, 389 insertions(+) create mode 100644 scripts/benchmarking_dataset.py create mode 100644 scripts/benchmarking_utils.py diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py new file mode 100644 index 00000000000..b309ad840aa --- /dev/null +++ b/scripts/benchmarking_dataset.py @@ -0,0 +1,232 @@ +import argparse +import random + +import pandas as pd + +from docarray import DocumentArray +from rich.console import Console +from rich.table import Table +import h5py +import numpy as np + +from benchmarking_utils import ( + create, + read, + update, + delete, + find_by_condition, + find_by_vector, + get_docs, + fmt, + recall_from_numpy, + save_benchmark_df, + plot_results, +) + + +def get_configuration_storage_backends(argparse): + parser = argparse.ArgumentParser() + parser.add_argument( + '--default-hnsw', + help='Whether to use default HNSW configurations', + action='store_true', + ) + + args = parser.parse_args() + + if args.default_hnsw: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + {'n_dim': D}, + ), + ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), + ('weaviate', {'n_dim': D}), + ('elasticsearch', {'n_dim': D}), + ] + else: + storage_backends = [ + ('memory', None), + ('sqlite', None), + ( + 'annlite', + { + 'n_dim': D, + 'ef_construction': 100, + 'ef_search': 100, + 'max_connection': 16, + }, + ), + ( + 'qdrant', + {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}, + ), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), + ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ] + return storage_backends + + +def run_benchmark( + X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends +): + table = Table( + title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' + ) + benchmark_df = pd.DataFrame( + { + 'Storage Backend': [], + 'Indexing time (C)': [], + 'Query (R)': [], + 'Update (U)': [], + 'Delete (D)': [], + 'Find by vector': [], + f'Recall at k={K} for vector search': [], + 'Find by condition': [], + } + ) + + for col in benchmark_df.columns: + table.add_column(col) + + console = Console() + find_by_vector_values = {str(n_index): [] for n_index in n_index_values} + create_values = {str(n_index): [] for n_index in n_index_values} + + console.print(f'Reading dataset') + docs = get_docs(X_tr) + docs_to_delete = random.sample(docs, n_query) + docs_to_update = random.sample(docs, n_query) + vector_queries = [x for x in X_te] + ground_truth = [] + + for idx, n_index in enumerate(n_index_values): + for backend, config in storage_backends: + try: + console.print('\nBackend:', backend.title()) + # for n_i in n_index: + if not config: + da = DocumentArray(storage=backend) + else: + da = DocumentArray(storage=backend, config=config) + console.print(f'\tindexing {n_index} docs ...') + create_time, _ = create(da, docs) + # for n_q in n_query: + console.print(f'\treading {n_query} docs ...') + read_time, _ = read( + da, + random.sample([d.id for d in docs], n_query), + ) + console.print(f'\tupdating {n_query} docs ...') + update_time, _ = update(da, docs_to_update) + console.print(f'\tdeleting {n_query} docs ...') + delete_time, _ = delete(da, [d.id for d in docs_to_delete]) + console.print( + f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' + ) + if backend == 'memory': + find_by_vector_time, _ = find_by_vector( + da, vector_queries[0], limit=K + ) + ground_truth = [ + x for x in dataset['neighbors'][0 : len(vector_queries)] + ] + recall_at_k = 1 + elif backend == 'sqlite': + find_by_vector_time, result = find_by_vector( + da, vector_queries[0], limit=K + ) + recall_at_k = 1 + else: + recall_at_k_values = [] + find_by_vector_times = [] + for i, query in enumerate(vector_queries): + find_by_vector_time, results = find_by_vector( + da, query, limit=K + ) + find_by_vector_times.append(find_by_vector_time) + recall_at_k_values.append( + recall_from_numpy( + np.array(results[:, 'tags__i']), ground_truth[i], K + ) + ) + + recall_at_k = np.mean(recall_at_k_values) + find_by_vector_time = np.mean(find_by_vector_times) + + console.print(f'\tfinding {n_query} docs by condition ...') + find_by_condition_time, _ = find_by_condition( + da, {'tags__i': {'$eq': 0}} + ) + if idx == len(n_index_values) - 1: + table.add_row( + backend.title(), + fmt(create_time, 's'), + fmt(read_time * 1000, 'ms'), + fmt(update_time * 1000, 'ms'), + fmt(delete_time * 1000, 'ms'), + fmt(find_by_vector_time, 's'), + '{:.3f}'.format(recall_at_k), + fmt(find_by_condition_time, 's'), + ) + benchmark_df.append( + pd.DataFrame( + [ + [ + backend.title(), + create_time, + read_time, + update_time, + delete_time, + find_by_vector_time, + recall_at_k, + find_by_condition_time, + ] + ], + columns=benchmark_df.columns, + ) + ) + + # store find_by_vector time + find_by_vector_values[str(n_index)].append(find_by_vector_time) + create_values[str(n_index)].append(create_time) + console.print(table) + da.clear() + del da + except Exception as e: + console.print(f'Storage Backend {backend} failed: {e}') + + console.print(table) + return find_by_vector_values, create_values, benchmark_df + + +if __name__ == "__main__": + + # Parameters settable by the user + n_query = 1 + K = 10 + DATASET_PATH = 'sift-128-euclidean.hdf5' + np.random.seed(123) + + # Variables gathered from the dataset + dataset = h5py.File(DATASET_PATH, 'r') + X_tr = dataset['train'] + X_te = dataset['test'] + D = X_tr.shape[1] + n_index_values = [len(X_tr)] + n_vector_queries = len(X_te) + + # Benchmark + storage_backends = get_configuration_storage_backends(argparse) + find_by_vector_values, create_values, benchmark_df = run_benchmark( + X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + ) + plot_results( + find_by_vector_values, storage_backends, create_values, plot_legend=False + ) + save_benchmark_df(benchmark_df) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py new file mode 100644 index 00000000000..a1357d0932a --- /dev/null +++ b/scripts/benchmarking_utils.py @@ -0,0 +1,157 @@ +import functools +from time import perf_counter +from docarray import Document +import numpy as np +import seaborn as sns +import matplotlib.pyplot as plt +import pandas as pd + + +def timer(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + start = perf_counter() + res = func(*args, **kwargs) + return (perf_counter() - start, res) + + return wrapper + + +@timer +def create(da, docs): + da.extend(docs) + + +@timer +def read(da, ids): + da[ids] + + +@timer +def update(da, docs): + da[[d.id for d in docs]] = docs + + +@timer +def delete(da, ids): + del da[ids] + + +@timer +def find_by_condition(da, query): + da.find(query) + + +@timer +def find_by_vector(da, query, limit): + return da.find(query, limit=limit) + + +def get_docs(X_tr): + return [ + Document( + embedding=x, + # tensor=np.random.rand(*tensor_shape), + tags={'i': int(i)}, + ) + for i, x in enumerate(X_tr) + ] + + +def fmt(value, unit): + return '{:.3f} {}'.format(value, unit) + + +def recall(predicted, relevant, eval_at): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len( + set(predicted_at_k[:, 'id']).intersection(set(relevant[:, 'id'])) + ) + return n_predicted_and_relevant / len(relevant) + + +def recall_from_numpy(predicted: np.ndarray, relevant: np.ndarray, eval_at: int): + if eval_at == 0: + return 0.0 + predicted_at_k = predicted[:eval_at] + n_predicted_and_relevant = len(set(predicted_at_k).intersection(set(relevant))) + return n_predicted_and_relevant / len(relevant) + + +def save_benchmark_df(benchmark_df): + benchmark_df.to_csv('benchmark-seconds.csv') + + benchmark_df['Indexing time (C)'] = benchmark_df['Indexing time (C)'].apply( + lambda value: 1_000_000 / value + ) + benchmark_df['Query (R)'] = benchmark_df['Query (R)'].apply(lambda value: 1 / value) + benchmark_df['Update (U)'] = benchmark_df['Update (U)'].apply( + lambda value: 1 / value + ) + benchmark_df['Delete (D)'] = benchmark_df['Delete (D)'].apply( + lambda value: 1 / value + ) + benchmark_df['Find by vector'] = benchmark_df['Find by vector'].apply( + lambda value: 1 / value + ) + benchmark_df['Find by condition'] = benchmark_df['Find by condition'].apply( + lambda value: 1 / value + ) + + benchmark_df.to_csv('benchmark-qps.csv') + + +def plot_results( + find_by_vector_values, storage_backends, create_values, plot_legend=True +): + find_df = pd.DataFrame(find_by_vector_values) + find_df.index = [backend for backend, _ in storage_backends] + find_df = find_df.drop(['sqlite']) + + print('\n\nQuery times') + print(find_df) + + fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(17, 5)) + + find_df.plot( + kind="bar", + ax=ax1, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Find by vector per backend and dataset size', + # ylabel='seconds', + rot=0, + legend=plot_legend, + ) + ax1.set_ylabel('seconds', fontsize=18) + ax1.set_title('Find by vector per backend', fontsize=18) + + threshold = 0.3 + ax1.hlines(y=threshold, xmin=-20, xmax=20, linewidth=2, color='r', linestyle='--') + + create_df = pd.DataFrame(create_values) + create_df.index = [backend for backend, _ in storage_backends] + + create_df = create_df.drop(['memory']) + print('\n\nIndexing times') + print(create_df) + create_df.plot( + kind="bar", + ax=ax2, + fontsize=16, + color=sns.color_palette('muted')[1:4], + # title='Indexing per backend and dataset size', + # ylabel='seconds', + rot=0, + legend=plot_legend, + ) + + ax2.set_ylabel('seconds', fontsize=18) + ax2.set_title('Indexing per backend', fontsize=18) + + plt.tight_layout() + # ax1.legend(fontsize=15) + # ax2.legend(fontsize=15) + plt.savefig('benchmark.svg') From ba51b2381f2e9e583cb8e88dc294894949383f99 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 13:16:34 +0200 Subject: [PATCH 7/9] refactor: sift1M --- scripts/benchmarking_sift_1M.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/benchmarking_sift_1M.py b/scripts/benchmarking_sift_1M.py index 7aa07519173..aeaf57f6ff2 100644 --- a/scripts/benchmarking_sift_1M.py +++ b/scripts/benchmarking_sift_1M.py @@ -1,4 +1,3 @@ - import argparse import functools import random @@ -23,8 +22,8 @@ DATASET_PATH = './sift-128-euclidean.hdf5' dataset = h5py.File(DATASET_PATH, 'r') -X_tr = dataset['train'] -X_te = dataset['test'] +X_tr = dataset['train'][0:1000] +X_te = dataset['test'][0:100] n_index_values = [len(X_tr)] n_vector_queries = len(X_te) @@ -288,8 +287,8 @@ def recall(predicted, relevant, eval_at): ax2.set_title('Indexing per backend', fontsize=18) plt.tight_layout() -#ax1.legend(fontsize=15) -#ax2.legend(fontsize=15) +# ax1.legend(fontsize=15) +# ax2.legend(fontsize=15) plt.savefig('benchmark.svg') console.print(table) From aed73b182acd9b3509d1eaab02f6866aba9d5048 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Tue, 26 Apr 2022 17:16:04 +0200 Subject: [PATCH 8/9] fix: recall bug --- scripts/benchmarking_dataset.py | 30 +++++++++++++++++++++++------- scripts/benchmarking_utils.py | 16 +++++++++++++++- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index b309ad840aa..6a7cc4d581f 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -37,7 +37,6 @@ def get_configuration_storage_backends(argparse): if args.default_hnsw: storage_backends = [ ('memory', None), - ('sqlite', None), ( 'annlite', {'n_dim': D}, @@ -45,11 +44,11 @@ def get_configuration_storage_backends(argparse): ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), ('weaviate', {'n_dim': D}), ('elasticsearch', {'n_dim': D}), + ('sqlite', None), ] else: storage_backends = [ ('memory', None), - ('sqlite', None), ( 'annlite', { @@ -68,12 +67,21 @@ def get_configuration_storage_backends(argparse): {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, ), ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), + ('sqlite', None), ] return storage_backends def run_benchmark( - X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + X_tr, + X_te, + dataset, + n_index_values, + n_vector_queries, + n_query, + storage_backends, + K, + D, ): table = Table( title=f'DocArray Benchmarking n_index={n_index_values[-1]} n_query={n_query} D={D} K={K}' @@ -130,11 +138,11 @@ def run_benchmark( f'\tfinding {n_query} docs by vector averaged {n_vector_queries} times ...' ) if backend == 'memory': - find_by_vector_time, _ = find_by_vector( + find_by_vector_time, aux = find_by_vector( da, vector_queries[0], limit=K ) ground_truth = [ - x for x in dataset['neighbors'][0 : len(vector_queries)] + x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)] ] recall_at_k = 1 elif backend == 'sqlite': @@ -152,7 +160,7 @@ def run_benchmark( find_by_vector_times.append(find_by_vector_time) recall_at_k_values.append( recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i], K + np.array(results[:, 'tags__i']), ground_truth[i][0:K], K ) ) @@ -224,7 +232,15 @@ def run_benchmark( # Benchmark storage_backends = get_configuration_storage_backends(argparse) find_by_vector_values, create_values, benchmark_df = run_benchmark( - X_tr, X_te, dataset, n_index_values, n_vector_queries, n_query, storage_backends + X_tr, + X_te, + dataset, + n_index_values, + n_vector_queries, + n_query, + storage_backends, + K, + D, ) plot_results( find_by_vector_values, storage_backends, create_values, plot_legend=False diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index a1357d0932a..dbb4e33a12f 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -1,5 +1,7 @@ import functools from time import perf_counter +from typing import Iterable + from docarray import Document import numpy as np import seaborn as sns @@ -72,7 +74,19 @@ def recall(predicted, relevant, eval_at): return n_predicted_and_relevant / len(relevant) -def recall_from_numpy(predicted: np.ndarray, relevant: np.ndarray, eval_at: int): +def recall_from_numpy(predicted, relevant, eval_at: int): + """ + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],5) + 1.0 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],4) + 0.8 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],3) + 0.3 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],2) + 0.4 + >>> recall_from_numpy([1,2,3,4,5], [5,4,3,2,1],1) + 0.2 + """ if eval_at == 0: return 0.0 predicted_at_k = predicted[:eval_at] From ea658c15c1c9e883505f20befccbac8411212a69 Mon Sep 17 00:00:00 2001 From: David Buchaca Prats Date: Wed, 27 Apr 2022 17:39:05 +0200 Subject: [PATCH 9/9] refactor: get groundtruth before --- scripts/benchmarking_dataset.py | 21 +++++++++------------ scripts/benchmarking_utils.py | 1 - 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/scripts/benchmarking_dataset.py b/scripts/benchmarking_dataset.py index 6a7cc4d581f..c7e218085b2 100644 --- a/scripts/benchmarking_dataset.py +++ b/scripts/benchmarking_dataset.py @@ -36,19 +36,22 @@ def get_configuration_storage_backends(argparse): if args.default_hnsw: storage_backends = [ - ('memory', None), + ('weaviate', {'n_dim': D}), ( 'annlite', {'n_dim': D}, ), ('qdrant', {'n_dim': D, 'scroll_batch_size': 8}), - ('weaviate', {'n_dim': D}), ('elasticsearch', {'n_dim': D}), ('sqlite', None), + ('memory', None), ] else: storage_backends = [ - ('memory', None), + ( + 'weaviate', + {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, + ), ( 'annlite', { @@ -62,12 +65,9 @@ def get_configuration_storage_backends(argparse): 'qdrant', {'n_dim': D, 'scroll_batch_size': 8, 'ef_construct': 100, 'm': 16}, ), - ( - 'weaviate', - {'n_dim': D, 'ef': 100, 'ef_construction': 100, 'max_connections': 16}, - ), ('elasticsearch', {'n_dim': D, 'ef_construction': 100, 'm': 16}), ('sqlite', None), + ('memory', None), ] return storage_backends @@ -111,7 +111,7 @@ def run_benchmark( docs_to_delete = random.sample(docs, n_query) docs_to_update = random.sample(docs, n_query) vector_queries = [x for x in X_te] - ground_truth = [] + ground_truth = [x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)]] for idx, n_index in enumerate(n_index_values): for backend, config in storage_backends: @@ -141,9 +141,6 @@ def run_benchmark( find_by_vector_time, aux = find_by_vector( da, vector_queries[0], limit=K ) - ground_truth = [ - x[0:K] for x in dataset['neighbors'][0 : len(vector_queries)] - ] recall_at_k = 1 elif backend == 'sqlite': find_by_vector_time, result = find_by_vector( @@ -160,7 +157,7 @@ def run_benchmark( find_by_vector_times.append(find_by_vector_time) recall_at_k_values.append( recall_from_numpy( - np.array(results[:, 'tags__i']), ground_truth[i][0:K], K + np.array(results[:, 'tags__i']), ground_truth[i], K ) ) diff --git a/scripts/benchmarking_utils.py b/scripts/benchmarking_utils.py index dbb4e33a12f..386d9061bc3 100644 --- a/scripts/benchmarking_utils.py +++ b/scripts/benchmarking_utils.py @@ -123,7 +123,6 @@ def plot_results( find_df = pd.DataFrame(find_by_vector_values) find_df.index = [backend for backend, _ in storage_backends] find_df = find_df.drop(['sqlite']) - print('\n\nQuery times') print(find_df)