forked from pinecone-io/pinecone-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgenerate_usage.py
More file actions
executable file
·94 lines (70 loc) · 2.81 KB
/
generate_usage.py
File metadata and controls
executable file
·94 lines (70 loc) · 2.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import random
import string
from pinecone.grpc import PineconeGRPC
def read_env_var(name):
value = os.environ.get(name)
if value is None:
raise Exception("Environment variable {} is not set".format(name))
return value
def random_string(length):
return "".join(random.choice(string.ascii_lowercase) for i in range(length))
def random_embedding_values(dimension=2):
return [random.random() for _ in range(dimension)]
def write_gh_output(name, value):
with open(os.environ["GITHUB_OUTPUT"], "a") as fh:
print(f"{name}={value}", file=fh)
DIMENSION = 1536 # common for openai embeddings
def create_index_if_not_exists(pc, index_name):
if index_name not in pc.list_indexes().names():
print(f"Index {index_name} does not exist, creating it")
pc.create_index(
name=index_name,
metric="cosine",
dimension=DIMENSION,
spec={"serverless": {"cloud": read_env_var("CLOUD"), "region": read_env_var("REGION")}},
)
upserted_ids = set()
def main():
pc = PineconeGRPC(api_key=read_env_var("PINECONE_API_KEY"))
index_name = read_env_var("INDEX_NAME")
iterations = int(read_env_var("ITERATIONS"))
create_index_if_not_exists(pc, index_name)
index = pc.Index(name=index_name)
for i in range(iterations):
try:
# Upsert some vectors
items_to_upsert = random.randint(1, 100)
vector_list = [
{
"id": random_string(10),
"values": random_embedding_values(DIMENSION),
"metadata": {
"genre": random.choice(["action", "comedy", "drama"]),
"runtime": random.randint(60, 120),
},
}
for x in range(items_to_upsert)
]
index.upsert(vectors=vector_list)
print("Upserted {} vectors".format(items_to_upsert))
for v in vector_list:
upserted_ids.add(v["id"])
# Fetch some vectors
ids_to_fetch = random.sample(upserted_ids, k=random.randint(1, 20))
print("Fetching {} vectors".format(len(ids_to_fetch)))
index.fetch(ids=ids_to_fetch)
# Query some vectors
print("Querying 10 times")
for i in range(10):
# Query by vector values
query_vector = random_embedding_values(DIMENSION)
index.query(vector=query_vector, top_k=10)
# Delete some vectors
print("Deleting some vectors")
id_to_delete = random.sample(upserted_ids, k=random.randint(1, 10))
index.delete(ids=id_to_delete)
except Exception as e:
print("Exception: {}".format(e))
if __name__ == "__main__":
main()