-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathsock_test.py
More file actions
executable file
·149 lines (114 loc) · 5.15 KB
/
sock_test.py
File metadata and controls
executable file
·149 lines (114 loc) · 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#! /bin/env python3
'''
SOCK-specific tests
'''
#pylint: disable=global-statement,too-many-statements,missing-function-docstring,wrong-import-position
import argparse
import os
import sys
import subprocess
from time import time
from multiprocessing import Pool
from helper import SockWorker, prepare_open_lambda, setup_config, TestConfContext
from helper.test import set_test_filter, start_tests, check_test_results, set_worker_type, test
# You can either install the OpenLambda Python bindings
# or run the test from the project's root folder
sys.path.append('python/src')
from open_lambda import OpenLambda
@test
def install_examples_to_worker_registry():
"""Install all lambda functions from examples directory
to worker registry using admin install"""
examples_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "examples"
)
if not os.path.exists(examples_dir):
print(f"Examples directory not found at {examples_dir}")
return
# Get all directories in examples
example_functions = []
for item in os.listdir(examples_dir):
item_path = os.path.join(examples_dir, item)
if os.path.isdir(item_path):
# Check if it has f.py (required for lambda functions)
if os.path.exists(os.path.join(item_path, "f.py")):
example_functions.append(item_path)
print(f"Found {len(example_functions)} lambda functions in examples directory")
# Install each function using admin install command
# Find the ol binary - it should be in the project root
project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
ol_binary = os.path.join(project_root, "ol")
if not os.path.exists(ol_binary):
print(f"✗ OL binary not found at {ol_binary}")
return
# Get OL_DIR from the global args - for sock_test we'll use test-dir
ol_dir = "test-dir"
registry_dir = os.path.join(project_root, ol_dir, "registry")
for func_dir in example_functions:
func_name = os.path.basename(func_dir)
print(f"Installing {func_name} from {func_dir}")
try:
# Run ol admin install -p <worker_path> <function_directory>
result = subprocess.run([ol_binary, "admin", "install", f"-p={ol_dir}", func_dir],
capture_output=True, text=True, cwd=project_root)
if result.returncode == 0:
dest = os.path.abspath(os.path.join(registry_dir, f"{func_name}.tar.gz"))
print(f"✓ Successfully installed {func_name} → {dest}")
else:
print(f"✗ Failed to install {func_name}: {result.stderr}")
except subprocess.SubprocessError as e:
print(f"✗ Error installing {func_name}: {e}")
print("Finished installing example functions")
def sock_churn_task(args):
open_lambda = OpenLambda()
echo_path, parent, start, seconds = args
count = 0
while time() < start + seconds:
sandbox_id = open_lambda.create({"code": echo_path, "leaf": True, "parent": parent})
open_lambda.destroy(sandbox_id)
count += 1
return count
@test
def sock_churn(baseline, procs, seconds, fork):
# baseline: how many sandboxes are sitting idly throughout the experiment
# procs: how many procs are concurrently creating and deleting other sandboxes
echo_path = os.path.abspath("registry/echo.tar.gz")
open_lambda = OpenLambda()
if fork:
parent = open_lambda.create({"code": "", "leaf": False})
else:
parent = ""
for _ in range(baseline):
sandbox_id = open_lambda.create({"code": echo_path, "leaf": True, "parent": parent})
open_lambda.pause(sandbox_id)
start = time()
with Pool(procs) as pool:
reqs = sum(pool.map(sock_churn_task, [(echo_path, parent, start, seconds)] * procs,
chunksize=1))
return {"sandboxes_per_sec": reqs/seconds}
def run_tests():
print("Testing SOCK directly (without lambdas)")
with TestConfContext(server_mode="sock", mem_pool_mb=500):
install_examples_to_worker_registry()
sock_churn(baseline=0, procs=1, seconds=5, fork=False)
sock_churn(baseline=0, procs=1, seconds=10, fork=True)
sock_churn(baseline=0, procs=15, seconds=10, fork=True)
sock_churn(baseline=32, procs=1, seconds=10, fork=True)
sock_churn(baseline=32, procs=15, seconds=10, fork=True)
def main():
parser = argparse.ArgumentParser(description='Run SOCK-specific tests for OpenLambda')
parser.add_argument('--test_filter', type=str, default="")
parser.add_argument('--ol_dir', type=str, default="test-dir")
parser.add_argument('--registry', type=str, default="registry")
args = parser.parse_args()
set_test_filter([name for name in args.test_filter.split(",") if name != ''])
set_worker_type(SockWorker)
setup_config(args.ol_dir)
prepare_open_lambda(args.ol_dir)
start_tests()
registry_path = "file://" + os.path.abspath(args.registry)
with TestConfContext(registry=registry_path, limits={"installer_mem_mb": 250}):
run_tests()
check_test_results()
if __name__ == '__main__':
main()