Skip to content

Commit afcf801

Browse files
committed
remove comments
Signed-off-by: pandyamarut <[email protected]>
1 parent 0a2c203 commit afcf801

2 files changed

Lines changed: 14 additions & 59 deletions

File tree

runpod/serverless/modules/rp_scale.py

Lines changed: 9 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
1-
"""
2-
runpod | serverless | rp_scale.py
3-
OPTIMIZED VERSION - All performance improvements applied
4-
Now uses optimized JobsProgress from worker_state.py
5-
"""
6-
7-
# ============================================================================
8-
# PERFORMANCE OPTIMIZATIONS - These alone give 3-5x improvement
9-
# ============================================================================
10-
111
import asyncio
122

133
# OPTIMIZATION 1: Use uvloop for 2-4x faster event loop
144
try:
155
import uvloop
166
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
17-
print("✅ RunPod Optimization: uvloop enabled (2-4x faster event loop)")
187
except ImportError:
19-
print("⚠️ RunPod: Install uvloop for 2-4x performance: pip install uvloop")
8+
print("⚠️ RunPod: Install uvloop: pip install uvloop")
209

21-
# OPTIMIZATION 2: Use orjson for 3-10x faster JSON
2210
try:
2311
import orjson
2412
import json as stdlib_json
@@ -34,13 +22,9 @@ def safe_orjson_dumps(obj, **kwargs):
3422
stdlib_json.loads = safe_orjson_loads
3523
stdlib_json.dumps = safe_orjson_dumps
3624

37-
print("✅ RunPod Optimization: orjson enabled (3-10x faster JSON)")
3825
except ImportError:
39-
print("⚠️ RunPod: Install orjson for 3-10x performance: pip install orjson")
26+
print("⚠️ RunPod: Install orjson: pip install orjson")
4027

41-
# ============================================================================
42-
# Original imports with optimizations applied
43-
# ============================================================================
4428

4529
import signal
4630
import sys
@@ -59,7 +43,7 @@ def safe_orjson_dumps(obj, **kwargs):
5943

6044

6145
# ============================================================================
62-
# OPTIMIZATION 3: Job Caching for Batch Fetching
46+
# 3: Job Caching for Batch Fetching
6347
# ============================================================================
6448

6549
class JobCache:
@@ -187,9 +171,7 @@ def start(self):
187171
except ValueError:
188172
log.warning("Signal handling is only supported in the main thread.")
189173

190-
# Print performance stats on shutdown
191-
import atexit
192-
atexit.register(self._print_stats)
174+
193175

194176
asyncio.run(self.run())
195177

@@ -200,7 +182,7 @@ def handle_shutdown(self, signum, frame):
200182
async def run(self):
201183
"""Optimized main loop"""
202184
async with AsyncClientSession() as session:
203-
# OPTIMIZATION: Use create_task instead of gather for better control
185+
# Use create_task instead of gather for better control
204186
tasks = [
205187
asyncio.create_task(self.get_jobs(session), name="job_fetcher"),
206188
asyncio.create_task(self.run_jobs(session), name="job_runner")
@@ -245,7 +227,7 @@ async def get_jobs(self, session: ClientSession):
245227
continue
246228

247229
try:
248-
# OPTIMIZATION: Check cache first
230+
# Check cache first
249231
cached_jobs = await self._job_cache.get_jobs(jobs_needed)
250232
if cached_jobs:
251233
self._stats["cache_hits"] += len(cached_jobs)
@@ -256,7 +238,7 @@ async def get_jobs(self, session: ClientSession):
256238
if jobs_needed <= 0:
257239
continue
258240

259-
# OPTIMIZATION: Fetch more jobs than needed (batching)
241+
# Fetch more jobs than needed (batching)
260242
fetch_count = min(jobs_needed * 3, 50) # Fetch up to 3x needed, max 50
261243

262244
log.debug(f"JobScaler.get_jobs | Fetching {fetch_count} jobs (need {jobs_needed})")
@@ -268,7 +250,7 @@ async def get_jobs(self, session: ClientSession):
268250

269251
if not acquired_jobs:
270252
consecutive_empty += 1
271-
# OPTIMIZATION: Exponential backoff
253+
# Exponential backoff
272254
wait_time = min(0.1 * (2 ** consecutive_empty), 5.0)
273255
await asyncio.sleep(wait_time)
274256
continue
@@ -382,24 +364,4 @@ async def handle_job(self, session: ClientSession, job: dict):
382364
elapsed = time.perf_counter() - start_time
383365
self._stats["total_processing_time"] += elapsed
384366

385-
log.debug("Finished Job", job["id"])
386-
387-
def _print_stats(self):
388-
"""Print performance statistics"""
389-
runtime = time.perf_counter() - self._stats["start_time"]
390-
jobs = self._stats["jobs_processed"]
391-
392-
if runtime > 0 and jobs > 0:
393-
print("\n" + "="*60)
394-
print("RunPod Performance Statistics (Optimized):")
395-
print(f" Runtime: {runtime:.2f}s")
396-
print(f" Jobs processed: {jobs}")
397-
print(f" Jobs fetched: {self._stats['jobs_fetched']}")
398-
print(f" Cache hits: {self._stats['cache_hits']}")
399-
print(f" Cache efficiency: {self._stats['cache_hits'] / max(1, self._stats['jobs_fetched'] + self._stats['cache_hits']) * 100:.1f}%")
400-
print(f" Average job time: {self._stats['total_processing_time'] / jobs:.3f}s")
401-
print(f" Throughput: {jobs / runtime:.2f} jobs/second")
402-
print(" Optimizations enabled:")
403-
print(f" - uvloop: {'Yes' if 'uvloop' in str(asyncio.get_event_loop_policy()) else 'No'}")
404-
print(f" - orjson: {'Yes' if 'orjson' in sys.modules else 'No'}")
405-
print("="*60)
367+
log.debug("Finished Job", job["id"])

runpod/serverless/modules/worker_state.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,3 @@
1-
"""
2-
Handles getting stuff from environment variables and updating the global state like job id.
3-
OPTIMIZED VERSION - Using threading.Lock instead of multiprocessing for performance
4-
"""
5-
61
import os
72
import time
83
import uuid
@@ -60,9 +55,7 @@ def __str__(self) -> str:
6055
return self.id
6156

6257

63-
# ---------------------------------------------------------------------------- #
64-
# Optimized Job Tracker #
65-
# ---------------------------------------------------------------------------- #
58+
6659
class JobsProgress:
6760
"""
6861
OPTIMIZED: Track jobs in progress with O(1) operations using threading.Lock
@@ -120,7 +113,7 @@ def add(self, element: Any):
120113

121114
def get(self, element: Any) -> Optional[Job]:
122115
"""
123-
OPTIMIZED: O(1) retrieval using dict lookup
116+
retrieval using dict lookup
124117
"""
125118
if isinstance(element, str):
126119
search_id = element
@@ -139,7 +132,7 @@ def get(self, element: Any) -> Optional[Job]:
139132

140133
def remove(self, element: Any):
141134
"""
142-
OPTIMIZED: O(1) removal using dict
135+
removal using dict
143136
"""
144137
if isinstance(element, str):
145138
job_id = element
@@ -171,7 +164,7 @@ def get_job_list(self) -> Optional[str]:
171164

172165
def get_job_count(self) -> int:
173166
"""
174-
OPTIMIZED: O(1) count operation
167+
count operation
175168
"""
176169
# No lock needed for reading an int (atomic operation)
177170
return self._count
@@ -191,7 +184,7 @@ def __len__(self):
191184

192185
def __contains__(self, element: Any) -> bool:
193186
"""
194-
OPTIMIZED: O(1) membership test using dict
187+
membership test using dict
195188
"""
196189
if isinstance(element, str):
197190
search_id = element

0 commit comments

Comments
 (0)