Skip to content

Commit 0293403

Browse files
Merge pull request #240 from runpod/job_get_fail
Job get fail
2 parents 5581e2b + c3936af commit 0293403

3 files changed

Lines changed: 44 additions & 7 deletions

File tree

CHANGELOG.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,17 @@
11
# Change Log
22

3+
## Release 1.3.7 (11/29/23)
4+
5+
### Fixed
6+
7+
- Catch timeouts when checking for available jobs.
8+
9+
### Changed
10+
11+
- Updated and pinned aiohttp to 3.9.1
12+
13+
---
14+
315
## Release 1.3.6 (11/23/23)
416

517
## Fixed

runpod/serverless/modules/rp_job.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
8888
else:
8989
next_job = received_request
9090

91+
except asyncio.TimeoutError:
92+
log.debug("Timeout error, retrying.")
93+
if retry is False:
94+
break
95+
9196
except Exception as err: # pylint: disable=broad-except
9297
err_type = type(err).__name__
9398
err_message = str(err)
@@ -102,8 +107,6 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
102107

103108
await asyncio.sleep(1)
104109
else:
105-
log.debug("Confirmed valid request.", next_job['id'])
106-
107110
job_list.add_job(next_job["id"])
108111
log.debug("Request ID added.", next_job['id'])
109112

@@ -115,10 +118,16 @@ async def get_job(session: ClientSession, retry=True) -> Optional[Dict[str, Any]
115118
async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
116119
"""
117120
Run the job using the handler.
118-
Returns the job output or error.
121+
122+
Args:
123+
handler (Callable): The handler function to use.
124+
job (Dict[str, Any]): The job to run.
125+
126+
Returns:
127+
Dict[str, Any]: The result of running the job.
119128
"""
120-
log.info('Started', job["id"])
121-
run_result = {"error": "No output from handler."}
129+
log.info('Started.', job["id"])
130+
run_result = {}
122131

123132
try:
124133
handler_return = handler(job)
@@ -129,8 +138,7 @@ async def run_job(handler: Callable, job: Dict[str, Any]) -> Dict[str, Any]:
129138
if isinstance(job_output, dict):
130139
error_msg = job_output.pop("error", None)
131140
refresh_worker = job_output.pop("refresh_worker", None)
132-
133-
run_result = {"output": job_output}
141+
run_result['output'] = job_output
134142

135143
if error_msg:
136144
run_result["error"] = error_msg

tests/test_serverless/test_modules/test_job.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Test Serverless Job Module
33
'''
44

5+
import asyncio
56
from unittest.mock import Mock, patch
67

78
from unittest import IsolatedAsyncioTestCase
@@ -139,6 +140,22 @@ async def test_get_job_no_input(self):
139140
assert job is None
140141
assert mock_log.error.call_count == 1
141142

143+
async def test_get_job_no_timeout(self):
144+
""" Tests the get_job function with a timeout """
145+
# Timeout Mock
146+
response_timeout = Mock(ClientResponse)
147+
response_timeout.status = 200
148+
149+
with patch("aiohttp.ClientSession") as mock_session_timeout, \
150+
patch("runpod.serverless.modules.rp_job.log", new_callable=Mock) as mock_log, \
151+
patch("runpod.serverless.modules.rp_job.JOB_GET_URL", "http://mock.url"):
152+
153+
mock_session_timeout.get.return_value.__aenter__.side_effect = asyncio.TimeoutError
154+
job = await rp_job.get_job(mock_session_timeout, retry=False)
155+
156+
assert job is None
157+
assert mock_log.error.call_count == 0
158+
142159
async def test_get_job_exception(self):
143160
'''
144161
Tests the get_job function with an exception

0 commit comments

Comments
 (0)