Skip to content

Commit 65215bd

Browse files
Merge pull request #274 from runpod/sls-core-tests
fix: add missing sls core tests
2 parents 1127ffb + 8cf43e9 commit 65215bd

4 files changed

Lines changed: 131 additions & 30 deletions

File tree

.github/tests.json

Lines changed: 78 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
{
33
"hardwareConfig": {
44
"endpointConfig": {
5-
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
6-
"name": "runpod-python E2E Test - Basic"
5+
"name": "runpod-python E2E Test - Basic",
6+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
77
}
88
},
99
"input": {
@@ -13,8 +13,8 @@
1313
{
1414
"hardwareConfig": {
1515
"endpointConfig": {
16-
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
17-
"name": "runpod-python E2E Test - Long Job"
16+
"name": "runpod-python E2E Test - Long Job",
17+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
1818
}
1919
},
2020
"input": {
@@ -25,8 +25,8 @@
2525
{
2626
"hardwareConfig": {
2727
"endpointConfig": {
28-
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
29-
"name": "runpod-python E2E Test - Generator Handler"
28+
"name": "runpod-python E2E Test - Generator Handler",
29+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
3030
},
3131
"templateConfig": {
3232
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream"
@@ -43,8 +43,8 @@
4343
{
4444
"hardwareConfig": {
4545
"endpointConfig": {
46-
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
47-
"name": "runpod-python E2E Test - Async Generator Handler"
46+
"name": "runpod-python E2E Test - Async Generator Handler",
47+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
4848
},
4949
"templateConfig": {
5050
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream"
@@ -61,8 +61,8 @@
6161
{
6262
"hardwareConfig": {
6363
"endpointConfig": {
64-
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
65-
"name": "runpod-python E2E Test - Serverless Core"
64+
"name": "runpod-python E2E Test - Serverless Core - Basic",
65+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
6666
},
6767
"templateConfig": {
6868
"env": [
@@ -76,5 +76,73 @@
7676
"input": {
7777
"mock_return": "this worked!"
7878
}
79+
},
80+
{
81+
"hardwareConfig": {
82+
"endpointConfig": {
83+
"name": "runpod-python E2E Test - Serverless Core - Long Job",
84+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
85+
},
86+
"templateConfig": {
87+
"env": [
88+
{
89+
"key": "RUNPOD_USE_CORE",
90+
"value": "true"
91+
}
92+
]
93+
}
94+
},
95+
"input": {
96+
"mock_return": "Delay test successful returned after waiting 5 minutes.",
97+
"mock_delay": 300
98+
}
99+
},
100+
{
101+
"hardwareConfig": {
102+
"endpointConfig": {
103+
"name": "runpod-python E2E Test - Serverless Core - Generator Handler",
104+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
105+
},
106+
"templateConfig": {
107+
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream",
108+
"env": [
109+
{
110+
"key": "RUNPOD_USE_CORE",
111+
"value": "true"
112+
}
113+
]
114+
}
115+
},
116+
"input": {
117+
"mock_return": [
118+
"value1",
119+
"value2",
120+
"value3"
121+
]
122+
}
123+
},
124+
{
125+
"hardwareConfig": {
126+
"endpointConfig": {
127+
"name": "runpod-python E2E Test - Serverless Core - Async Generator Handler",
128+
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
129+
},
130+
"templateConfig": {
131+
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream",
132+
"env": [
133+
{
134+
"key": "RUNPOD_USE_CORE",
135+
"value": "true"
136+
}
137+
]
138+
}
139+
},
140+
"input": {
141+
"mock_return": [
142+
"value1",
143+
"value2",
144+
"value3"
145+
]
146+
}
79147
}
80148
]

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Change Log
22

3+
## Release 1.6.0 (1/29/24)
4+
5+
### Fixed
6+
7+
- Rust Serverless Core Passing all tests.
8+
- GitHub Action and Python package updates
9+
- Changelog date typo
10+
311
## Release 1.5.3 (1/25/24)
412

513
### Added

runpod/serverless/core.py

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@
99
from ctypes import CDLL, byref, c_char_p, c_int
1010
from typing import Any, Callable, List, Dict, Optional
1111

12+
from runpod.version import __version__ as runpod_version
1213
from runpod.serverless.modules.rp_logger import RunPodLogger
13-
14+
from runpod.serverless.modules import rp_job
1415

1516
log = RunPodLogger()
1617

@@ -44,6 +45,7 @@ class Hook: # pylint: disable=too-many-instance-attributes
4445

4546
def __new__(cls):
4647
if Hook._instance is None:
48+
log.debug("SLS Core | Initializing Hook.")
4749
Hook._instance = object.__new__(cls)
4850
Hook._initialized = False
4951
return Hook._instance
@@ -136,7 +138,7 @@ def progress_update(self, job_id: str, json_data: bytes) -> bool:
136138
c_char_p(json_data), c_int(len(json_data))
137139
))
138140

139-
def stream_output(self, job_id: str, job_output: bytes) -> bool:
141+
async def stream_output(self, job_id: str, job_output: bytes) -> bool:
140142
"""
141143
send part of a streaming result to AI-API.
142144
"""
@@ -170,48 +172,70 @@ def finish_stream(self, job_id: str) -> bool:
170172

171173

172174
# -------------------------------- Process Job ------------------------------- #
173-
async def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
175+
async def _process_job(config: Dict[str, Any], job: Dict[str, Any], hook) -> Dict[str, Any]:
174176
""" Process a single job. """
175-
hook = Hook()
177+
handler = config['handler']
176178

179+
result = {}
177180
try:
178-
result = handler(job)
179-
except Exception as err:
180-
raise RuntimeError(
181-
f"run {job['id']}: user code raised an {type(err).__name__}") from err
181+
if inspect.isgeneratorfunction(handler) or inspect.isasyncgenfunction(handler):
182+
log.debug("SLS Core | Running job as a generator.")
183+
generator_output = rp_job.run_job_generator(handler, job)
184+
aggregated_output = {'output': []}
185+
186+
async for part in generator_output:
187+
log.debug(f"SLS Core | Streaming output: {part}", job['id'])
188+
189+
if 'error' in part:
190+
aggregated_output = part
191+
break
192+
if config.get('return_aggregate_stream', False):
193+
aggregated_output['output'].append(part['output'])
194+
195+
await hook.stream_output(job['id'], part)
182196

183-
if inspect.isgeneratorfunction(handler):
184-
for part in result:
185-
hook.stream_output(job['id'], part)
197+
log.debug("SLS Core | Finished streaming output.", job['id'])
198+
hook.finish_stream(job['id'])
199+
result = aggregated_output
186200

187-
hook.finish_stream(job['id'])
201+
else:
202+
log.debug("SLS Core | Running job as a standard function.")
203+
result = await rp_job.run_job(handler, job)
204+
result = result.get('output', result)
205+
206+
except Exception as err: # pylint: disable=broad-except
207+
log.error(f"SLS Core | Error running job: {err}", job['id'])
208+
result = {'error': str(err)}
188209

189-
else:
210+
finally:
211+
log.debug(f"SLS Core | Posting output: {result}", job['id'])
190212
hook.post_output(job['id'], result)
191213

192214

193-
# -------------------------------- Run Worker -------------------------------- #
215+
# ---------------------------------------------------------------------------- #
216+
# Run Worker #
217+
# ---------------------------------------------------------------------------- #
194218
async def run(config: Dict[str, Any]) -> None:
195219
""" Run the worker.
196220
197221
Args:
198222
config: A dictionary containing the following keys:
199223
handler: A function that takes a job and returns a result.
200224
"""
201-
handler = config['handler']
202-
max_concurrency = config.get('max_concurrency', 4)
203-
max_jobs = config.get('max_jobs', 4)
225+
max_concurrency = config.get('max_concurrency', 1)
226+
max_jobs = config.get('max_jobs', 1)
204227

205-
hook = Hook()
228+
serverless_hook = Hook()
206229

207230
while True:
208-
jobs = hook.get_jobs(max_concurrency, max_jobs)
231+
jobs = serverless_hook.get_jobs(max_concurrency, max_jobs)
209232

210233
if len(jobs) == 0 or jobs is None:
234+
await asyncio.sleep(0)
211235
continue
212236

213237
for job in jobs:
214-
asyncio.create_task(_process_job(handler, job), name=job['id'])
238+
asyncio.create_task(_process_job(config, job, serverless_hook), name=job['id'])
215239
await asyncio.sleep(0)
216240

217241
await asyncio.sleep(0)
@@ -220,6 +244,7 @@ async def run(config: Dict[str, Any]) -> None:
220244
def main(config: Dict[str, Any]) -> None:
221245
"""Run the worker in an asyncio event loop."""
222246
if config.get('handler') is None:
247+
log.error("SLS Core | config must contain a handler function")
223248
raise ValueError("config must contain a handler function")
224249

225250
try:

runpod/serverless/sls_core.so

-278 KB
Binary file not shown.

0 commit comments

Comments
 (0)