Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Change Log

## Release 1.3.7 (11/29/23)

### Fixed

- Catch timeouts when checking for available jobs.

### Changed

- Updated and pinned aiohttp to 3.9.1

---

## Release 1.3.6 (11/23/23)

## Fixed
Expand Down
22 changes: 15 additions & 7 deletions runpod/serverless/modules/rp_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
else:
next_job = received_request

except asyncio.TimeoutError:
log.debug("Timeout error, retrying.")
if retry is False:
break

except Exception as err: # pylint: disable=broad-except
err_type = type(err).__name__
err_message = str(err)
Expand All @@ -102,8 +107,6 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]

await asyncio.sleep(1)
else:
log.debug("Confirmed valid request.", next_job['id'])

job_list.add_job(next_job["id"])
log.debug("Request ID added.", next_job['id'])

Expand All @@ -115,10 +118,16 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
"""
Run the job using the handler.
Returns the job output or error.

Args:
handler (Callable): The handler function to use.
job (Dict[str, Any]): The job to run.

Returns:
Dict[str, Any]: The result of running the job.
"""
log.info('Started', job["id"])
run_result = {"error": "No output from handler."}
log.info('Started.', job["id"])
run_result = {}

try:
handler_return = handler(job)
Expand All @@ -129,8 +138,7 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
if isinstance(job_output, dict):
error_msg = job_output.pop("error", None)
refresh_worker = job_output.pop("refresh_worker", None)

run_result = {"output": job_output}
run_result['output'] = job_output

if error_msg:
run_result["error"] = error_msg
Expand Down
17 changes: 17 additions & 0 deletions tests/test_serverless/test_modules/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Test Serverless Job Module
'''

import asyncio
from unittest.mock import Mock, patch

from unittest import IsolatedAsyncioTestCase
Expand Down Expand Up @@ -139,6 +140,22 @@ async def test_get_job_no_input(self):
assert job is None
assert mock_log.error.call_count == 1

async def test_get_job_no_timeout(self):
""" Tests the get_job function with a timeout """
# Timeout Mock
response_timeout = Mock(ClientResponse)
response_timeout.status = 200

with patch("aiohttp.ClientSession") as mock_session_timeout, \
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):

mock_session_timeout.get.return_value.__aenter__.side_effect = asyncio.TimeoutError
job = await rp_job.get_job(mock_session_timeout, retry=False)

assert job is None
assert mock_log.error.call_count == 0

async def test_get_job_exception(self):
'''
Tests the get_job function with an exception
Expand Down