Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 78 additions & 10 deletions .github/tests.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Basic"
"name": "runpod-python E2E Test - Basic",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
}
},
"input": {
Expand All @@ -13,8 +13,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Long Job"
"name": "runpod-python E2E Test - Long Job",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
}
},
"input": {
Expand All @@ -25,8 +25,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Generator Handler"
"name": "runpod-python E2E Test - Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream"
Expand All @@ -43,8 +43,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Async Generator Handler"
"name": "runpod-python E2E Test - Async Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream"
Expand All @@ -61,8 +61,8 @@
{
"hardwareConfig": {
"endpointConfig": {
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80",
"name": "runpod-python E2E Test - Serverless Core"
"name": "runpod-python E2E Test - Serverless Core - Basic",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"env": [
Expand All @@ -76,5 +76,73 @@
"input": {
"mock_return": "this worked!"
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Long Job",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": "Delay test successful returned after waiting 5 minutes.",
"mock_delay": 300
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --generator --return_aggregate_stream",
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": [
"value1",
"value2",
"value3"
]
}
},
{
"hardwareConfig": {
"endpointConfig": {
"name": "runpod-python E2E Test - Serverless Core - Async Generator Handler",
"gpuIds": "ADA_24,AMPERE_16,AMPERE_24,AMPERE_48,AMPERE_80"
},
"templateConfig": {
"dockerArgs": "python3 -u /handler.py --async_generator --return_aggregate_stream",
"env": [
{
"key": "RUNPOD_USE_CORE",
"value": "true"
}
]
}
},
"input": {
"mock_return": [
"value1",
"value2",
"value3"
]
}
}
]
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Change Log

## Release 1.6.0 (1/29/24)

### Fixed

- Rust Serverless Core Passing all tests.
- GitHub Action and Python package updates
- Changelog date typo

## Release 1.5.3 (1/25/24)

### Added
Expand Down
65 changes: 45 additions & 20 deletions runpod/serverless/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from ctypes import CDLL, byref, c_char_p, c_int
from typing import Any, Callable, List, Dict, Optional

from runpod.version import __version__ as runpod_version
from runpod.serverless.modules.rp_logger import RunPodLogger

from runpod.serverless.modules import rp_job

log = RunPodLogger()

Expand Down Expand Up @@ -44,6 +45,7 @@ class Hook: # pylint: disable=too-many-instance-attributes

def __new__(cls):
if Hook._instance is None:
log.debug("SLS Core | Initializing Hook.")
Hook._instance = object.__new__(cls)
Hook._initialized = False
return Hook._instance
Expand Down Expand Up @@ -136,7 +138,7 @@ def progress_update(self, job_id: str, json_data: bytes) -> bool:
c_char_p(json_data), c_int(len(json_data))
))

def stream_output(self, job_id: str, job_output: bytes) -> bool:
async def stream_output(self, job_id: str, job_output: bytes) -> bool:
"""
send part of a streaming result to AI-API.
"""
Expand Down Expand Up @@ -170,48 +172,70 @@ def finish_stream(self, job_id: str) -> bool:


# -------------------------------- Process Job ------------------------------- #
async def _process_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
async def _process_job(config: Dict[str, Any], job: Dict[str, Any], hook) -> Dict[str, Any]:
""" Process a single job. """
hook = Hook()
handler = config['handler']

result = {}
try:
result = handler(job)
except Exception as err:
raise RuntimeError(
f"run {job['id']}: user code raised an {type(err).__name__}") from err
if inspect.isgeneratorfunction(handler) or inspect.isasyncgenfunction(handler):
log.debug("SLS Core | Running job as a generator.")
generator_output = rp_job.run_job_generator(handler, job)
aggregated_output = {'output': []}

async for part in generator_output:
log.debug(f"SLS Core | Streaming output: {part}", job['id'])

if 'error' in part:
aggregated_output = part
break
if config.get('return_aggregate_stream', False):
aggregated_output['output'].append(part['output'])

await hook.stream_output(job['id'], part)

if inspect.isgeneratorfunction(handler):
for part in result:
hook.stream_output(job['id'], part)
log.debug("SLS Core | Finished streaming output.", job['id'])
hook.finish_stream(job['id'])
result = aggregated_output

hook.finish_stream(job['id'])
else:
log.debug("SLS Core | Running job as a standard function.")
result = await rp_job.run_job(handler, job)
result = result.get('output', result)

except Exception as err: # pylint: disable=broad-except
log.error(f"SLS Core | Error running job: {err}", job['id'])
result = {'error': str(err)}

else:
finally:
log.debug(f"SLS Core | Posting output: {result}", job['id'])
hook.post_output(job['id'], result)


# -------------------------------- Run Worker -------------------------------- #
# ---------------------------------------------------------------------------- #
# Run Worker #
# ---------------------------------------------------------------------------- #
async def run(config: Dict[str, Any]) -> None:
""" Run the worker.

Args:
config: A dictionary containing the following keys:
handler: A function that takes a job and returns a result.
"""
handler = config['handler']
max_concurrency = config.get('max_concurrency', 4)
max_jobs = config.get('max_jobs', 4)
max_concurrency = config.get('max_concurrency', 1)
max_jobs = config.get('max_jobs', 1)

hook = Hook()
serverless_hook = Hook()

while True:
jobs = hook.get_jobs(max_concurrency, max_jobs)
jobs = serverless_hook.get_jobs(max_concurrency, max_jobs)

if len(jobs) == 0 or jobs is None:
await asyncio.sleep(0)
continue

for job in jobs:
asyncio.create_task(_process_job(handler, job), name=job['id'])
asyncio.create_task(_process_job(config, job, serverless_hook), name=job['id'])
await asyncio.sleep(0)

await asyncio.sleep(0)
Expand All @@ -220,6 +244,7 @@ async def run(config: Dict[str, Any]) -> None:
def main(config: Dict[str, Any]) -> None:
"""Run the worker in an asyncio event loop."""
if config.get('handler') is None:
log.error("SLS Core | config must contain a handler function")
raise ValueError("config must contain a handler function")

try:
Expand Down
Binary file modified runpod/serverless/sls_core.so
Binary file not shown.