1- """
2- runpod | serverless | rp_scale.py
3- OPTIMIZED VERSION - All performance improvements applied
4- Now uses optimized JobsProgress from worker_state.py
5- """
6-
7- # ============================================================================
8- # PERFORMANCE OPTIMIZATIONS - These alone give 3-5x improvement
9- # ============================================================================
10-
111import asyncio
122
133# OPTIMIZATION 1: Use uvloop for 2-4x faster event loop
144try :
155 import uvloop
166 asyncio .set_event_loop_policy (uvloop .EventLoopPolicy ())
17- print ("✅ RunPod Optimization: uvloop enabled (2-4x faster event loop)" )
187except ImportError :
19- print ("⚠️ RunPod: Install uvloop for 2-4x performance : pip install uvloop" )
8+ print ("⚠️ RunPod: Install uvloop: pip install uvloop" )
209
21- # OPTIMIZATION 2: Use orjson for 3-10x faster JSON
2210try :
2311 import orjson
2412 import json as stdlib_json
@@ -34,13 +22,9 @@ def safe_orjson_dumps(obj, **kwargs):
3422 stdlib_json .loads = safe_orjson_loads
3523 stdlib_json .dumps = safe_orjson_dumps
3624
37- print ("✅ RunPod Optimization: orjson enabled (3-10x faster JSON)" )
3825except ImportError :
39- print ("⚠️ RunPod: Install orjson for 3-10x performance : pip install orjson" )
26+ print ("⚠️ RunPod: Install orjson: pip install orjson" )
4027
41- # ============================================================================
42- # Original imports with optimizations applied
43- # ============================================================================
4428
4529import signal
4630import sys
@@ -59,7 +43,7 @@ def safe_orjson_dumps(obj, **kwargs):
5943
6044
6145# ============================================================================
62- # OPTIMIZATION 3: Job Caching for Batch Fetching
46+ # 3: Job Caching for Batch Fetching
6347# ============================================================================
6448
6549class JobCache :
@@ -187,9 +171,7 @@ def start(self):
187171 except ValueError :
188172 log .warning ("Signal handling is only supported in the main thread." )
189173
190- # Print performance stats on shutdown
191- import atexit
192- atexit .register (self ._print_stats )
174+
193175
194176 asyncio .run (self .run ())
195177
@@ -200,7 +182,7 @@ def handle_shutdown(self, signum, frame):
200182 async def run (self ):
201183 """Optimized main loop"""
202184 async with AsyncClientSession () as session :
203- # OPTIMIZATION: Use create_task instead of gather for better control
185+ # Use create_task instead of gather for better control
204186 tasks = [
205187 asyncio .create_task (self .get_jobs (session ), name = "job_fetcher" ),
206188 asyncio .create_task (self .run_jobs (session ), name = "job_runner" )
@@ -245,7 +227,7 @@ async def get_jobs(self, session: ClientSession):
245227 continue
246228
247229 try :
248- # OPTIMIZATION: Check cache first
230+ # Check cache first
249231 cached_jobs = await self ._job_cache .get_jobs (jobs_needed )
250232 if cached_jobs :
251233 self ._stats ["cache_hits" ] += len (cached_jobs )
@@ -256,7 +238,7 @@ async def get_jobs(self, session: ClientSession):
256238 if jobs_needed <= 0 :
257239 continue
258240
259- # OPTIMIZATION: Fetch more jobs than needed (batching)
241+ # Fetch more jobs than needed (batching)
260242 fetch_count = min (jobs_needed * 3 , 50 ) # Fetch up to 3x needed, max 50
261243
262244 log .debug (f"JobScaler.get_jobs | Fetching { fetch_count } jobs (need { jobs_needed } )" )
@@ -268,7 +250,7 @@ async def get_jobs(self, session: ClientSession):
268250
269251 if not acquired_jobs :
270252 consecutive_empty += 1
271- # OPTIMIZATION: Exponential backoff
253+ # Exponential backoff
272254 wait_time = min (0.1 * (2 ** consecutive_empty ), 5.0 )
273255 await asyncio .sleep (wait_time )
274256 continue
@@ -382,24 +364,4 @@ async def handle_job(self, session: ClientSession, job: dict):
382364 elapsed = time .perf_counter () - start_time
383365 self ._stats ["total_processing_time" ] += elapsed
384366
385- log .debug ("Finished Job" , job ["id" ])
386-
387- def _print_stats (self ):
388- """Print performance statistics"""
389- runtime = time .perf_counter () - self ._stats ["start_time" ]
390- jobs = self ._stats ["jobs_processed" ]
391-
392- if runtime > 0 and jobs > 0 :
393- print ("\n " + "=" * 60 )
394- print ("RunPod Performance Statistics (Optimized):" )
395- print (f" Runtime: { runtime :.2f} s" )
396- print (f" Jobs processed: { jobs } " )
397- print (f" Jobs fetched: { self ._stats ['jobs_fetched' ]} " )
398- print (f" Cache hits: { self ._stats ['cache_hits' ]} " )
399- print (f" Cache efficiency: { self ._stats ['cache_hits' ] / max (1 , self ._stats ['jobs_fetched' ] + self ._stats ['cache_hits' ]) * 100 :.1f} %" )
400- print (f" Average job time: { self ._stats ['total_processing_time' ] / jobs :.3f} s" )
401- print (f" Throughput: { jobs / runtime :.2f} jobs/second" )
402- print (" Optimizations enabled:" )
403- print (f" - uvloop: { 'Yes' if 'uvloop' in str (asyncio .get_event_loop_policy ()) else 'No' } " )
404- print (f" - orjson: { 'Yes' if 'orjson' in sys .modules else 'No' } " )
405- print ("=" * 60 )
367+ log .debug ("Finished Job" , job ["id" ])
0 commit comments