This guide provides detailed technical information for developers working with or extending the SCIoT system.
SCIoT implements a distributed split computing architecture with intelligent offloading, variance detection, and client resilience.
┌─────────────┐ ┌──────────────────┐ ┌─────────────┐
│ Device │◄──►│ Edge Server │◄──►│ Other Clients│
│ │ │ │ │ │
│ Layer 0-N │ │ Layer N+1-58 │ │ Layer 0-M │
│ Local Inf. │ │ Variance Detect. │ │ Local Inf. │
│ Resilience │ │ Local Inf. Mode │ │ Resilience │
└─────────────┘ └──────────────────┘ └─────────────┘
Purpose: Monitor inference time stability and trigger re-evaluation when performance changes significantly.
class InferenceTimeHistory:
def __init__(self, layer_id: int, window_size: int = 10):
self.layer_id = layer_id
self.window_size = window_size
self.measurements = deque(maxlen=window_size)
def add_measurement(self, time: float) -> bool:
"""Add measurement and return True if variance threshold exceeded"""
self.measurements.append(time)
return not self.is_stable()
def is_stable(self, cv_threshold: float = 0.15) -> bool:
"""Check if coefficient of variation is below threshold"""
if len(self.measurements) < 3:
return True
mean_time = statistics.mean(self.measurements)
std_time = statistics.stdev(self.measurements)
cv = std_time / mean_time if mean_time > 0 else 0
return cv <= cv_thresholdKey algorithms:
- Coefficient of Variation (CV):
CV = σ/μ- normalized measure independent of scale - Sliding Window: Last N measurements (default: 10)
- Threshold: 15% CV triggers re-evaluation
- Cascade Propagation: Variance at layer
itriggers check at layeri+1
class VarianceDetector:
def __init__(self, window_size: int = 10, variance_threshold: float = 0.15):
self.layer_histories: Dict[int, InferenceTimeHistory] = {}
self.cascade_enabled = True
def add_measurement(self, layer_id: int, time: float) -> Dict[str, Any]:
"""Add measurement and return detection result"""
if layer_id not in self.layer_histories:
self.layer_histories[layer_id] = InferenceTimeHistory(layer_id)
needs_retest = self.layer_histories[layer_id].add_measurement(time)
result = {
'needs_retest': needs_retest,
'cascaded_layers': []
}
# Cascade to next layer if enabled and variance detected
if needs_retest and self.cascade_enabled:
next_layer = layer_id + 1
if next_layer in self.layer_histories:
next_needs_retest = not self.layer_histories[next_layer].is_stable()
if next_needs_retest:
result['cascaded_layers'].append(next_layer)
return resultPurpose: Periodically force device to run all layers locally to refresh measurements and validate performance.
class RequestHandler:
def __init__(self):
self.local_inference_config = self._load_local_inference_config()
def handle_inference_request(self, request_data: dict) -> dict:
"""Process inference request with optional local inference forcing"""
# Check if should force local inference
if self._should_force_local_inference():
logger.info("📍 LOCAL INFERENCE forced (probability hit)")
return {
'offloading_layer': -1, # Special value for "all local"
'message': 'Local inference mode'
}
# Normal offloading algorithm
offloading_layer = self._calculate_offloading_layer(request_data)
return {
'offloading_layer': offloading_layer,
'message': 'Normal offloading'
}
def _should_force_local_inference(self) -> bool:
"""Determine if current request should force local inference"""
if not self.local_inference_config.get('enabled', False):
return False
probability = self.local_inference_config.get('probability', 0.1)
return random.random() < probabilitydef run_split_inference(image, tflite_dir, stop_layer):
"""Run inference with support for local inference mode (-1)"""
input_data = image
inference_times = []
# Handle -1 as "run all layers until the end"
if stop_layer == -1:
stop_layer = LAST_OFFLOADING_LAYER # 58 for FOMO 96x96
print(f"Offloading layer -1: Running all {stop_layer + 1} layers locally")
# Run inference layers
for i in range(stop_layer + 1):
model_path = f"{tflite_dir}/model_{i}.tflite"
start_time = time.perf_counter()
# Load and run layer
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()
interpreter.set_tensor(interpreter.get_input_details()[0]['index'], input_data)
interpreter.invoke()
output_data = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])
end_time = time.perf_counter()
inference_times.append(end_time - start_time)
input_data = output_data
return input_data, inference_timesPurpose: Handle network failures gracefully without crashing or losing data.
class HttpClient:
def __init__(self, config_path: str):
self.config = self._load_config(config_path)
self.timeout = self.config['server']['timeout']
self.base_url = self.config['server']['base_url']
self.retry_count = 3
self.retry_delay = 1.0 # Initial delay
def send_inference_request(self, data: dict) -> Optional[dict]:
"""Send request with retry logic and timeout handling"""
for attempt in range(self.retry_count):
try:
response = requests.post(
f"{self.base_url}{self.config['server']['inference_endpoint']}",
json=data,
timeout=self.timeout
)
if response.status_code == 200:
return response.json()
else:
logger.warning(f"Server returned status {response.status_code}")
except requests.exceptions.Timeout:
logger.warning(f"Request timeout (attempt {attempt + 1}/{self.retry_count})")
except requests.exceptions.ConnectionError:
logger.warning(f"Connection error (attempt {attempt + 1}/{self.retry_count})")
except Exception as e:
logger.error(f"Unexpected error: {e}")
# Exponential backoff
if attempt < self.retry_count - 1:
time.sleep(self.retry_delay * (2 ** attempt))
logger.error("All retry attempts failed, falling back to local inference")
return None
def run_inference_cycle(self):
"""Run inference with graceful degradation"""
image = self._load_image()
# Try server offloading first
server_response = self.send_inference_request({
'request_id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat()
})
if server_response:
# Normal split computing
offloading_layer = server_response['offloading_layer']
output, device_times = run_split_inference(image, self.tflite_dir, offloading_layer)
# Send results back
self._send_results(device_times, offloading_layer)
else:
# Fallback to local-only inference
logger.info("🔄 Falling back to local-only inference")
output, device_times = run_split_inference(image, self.tflite_dir, LAST_OFFLOADING_LAYER)
return outputclass ModelManager:
def __init__(self, inference_times: dict,
computation_delay_config: dict = None,
variance_detector: VarianceDetector = None):
self.inference_times = inference_times
self.delay_config = computation_delay_config
self.variance_detector = variance_detector
self.loaded_models = {}
def get_inference_time_with_variance_check(self, layer_id: int, measured_time: float) -> float:
"""Get inference time and check for variance"""
if self.variance_detector:
result = self.variance_detector.add_measurement(layer_id, measured_time)
if result['needs_retest']:
logger.warning(f"⚠️ VARIANCE DETECTED at layer {layer_id}")
if result['cascaded_layers']:
for cascaded_layer in result['cascaded_layers']:
logger.info(f"🔄 CASCADE: Variance propagated to layer {cascaded_layer}")
# Apply delay simulation if configured
if self.delay_config and self.delay_config.get('enabled', False):
delay = self._generate_delay()
return measured_time + delay
return measured_time
def _generate_delay(self) -> float:
"""Generate computation delay based on configuration"""
mean = self.delay_config.get('mean', 0.002)
std_dev = self.delay_config.get('std_dev', 0.0005)
return max(0, random.gauss(mean, std_dev))from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
class InferenceRequest(BaseModel):
request_id: str
timestamp: str
device_times: Optional[List[float]] = None
app = FastAPI()
@app.post("/inference")
async def handle_inference(request: InferenceRequest):
"""Handle inference request with variance detection and local inference mode"""
try:
handler = RequestHandler()
response = handler.handle_inference_request(request.dict())
return response
except Exception as e:
logger.error(f"Inference request failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/results")
async def handle_results(request: InferenceRequest):
"""Handle inference results from client"""
if request.device_times:
# Process device timing measurements
for i, time in enumerate(request.device_times):
variance_detector.add_measurement(i, time)
return {"status": "success"}import asyncio
import websockets
import json
class WebsocketServer:
def __init__(self, host: str, port: int, endpoint: str):
self.host = host
self.port = port
self.endpoint = endpoint
self.request_handler = RequestHandler()
async def handle_client(self, websocket, path):
"""Handle WebSocket client connection"""
try:
async for message in websocket:
request_data = json.loads(message)
response = self.request_handler.handle_inference_request(request_data)
await websocket.send(json.dumps(response))
except websockets.exceptions.ConnectionClosed:
logger.info("Client disconnected")
except Exception as e:
logger.error(f"WebSocket error: {e}")
def run(self):
"""Start WebSocket server"""
start_server = websockets.serve(self.handle_client, self.host, self.port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()Objective: Minimize total inference time T_total = T_device + T_network + T_edge
def calculate_optimal_offloading_layer(device_times: List[float],
edge_times: List[float],
network_latency: float) -> int:
"""Calculate optimal offloading point using dynamic programming"""
min_total_time = float('inf')
optimal_layer = len(device_times) - 1 # Default: all local
for split_point in range(len(device_times)):
# Calculate device time (layers 0 to split_point)
device_time = sum(device_times[:split_point + 1])
# Calculate edge time (layers split_point+1 to end)
edge_time = sum(edge_times[split_point + 1:])
# Total time includes network latency
total_time = device_time + network_latency + edge_time
if total_time < min_total_time:
min_total_time = total_time
optimal_layer = split_point
return optimal_layerPurpose: Smooth timing measurements to reduce noise.
class EMATimer:
def __init__(self, alpha: float = 0.2):
self.alpha = alpha # Smoothing factor
self.ema_value = None
def update(self, new_value: float) -> float:
"""Update EMA with new measurement"""
if self.ema_value is None:
self.ema_value = new_value
else:
self.ema_value = self.alpha * new_value + (1 - self.alpha) * self.ema_value
return self.ema_valueclass NetworkLatencyEstimator:
def __init__(self):
self.ema_timer = EMATimer(alpha=0.3)
self.last_measurements = deque(maxlen=5)
def measure_latency(self, start_time: float, end_time: float) -> float:
"""Measure and update network latency estimate"""
latency = end_time - start_time
self.last_measurements.append(latency)
return self.ema_timer.update(latency)
def get_estimated_latency(self) -> float:
"""Get current latency estimate"""
if self.ema_timer.ema_value is None:
return 0.050 # Default: 50ms
return self.ema_timer.ema_valueimport pytest
from src.server.variance_detector import VarianceDetector
class TestVarianceDetector:
def test_stable_measurements(self):
"""Test that stable measurements don't trigger variance"""
detector = VarianceDetector()
# Add stable measurements
for _ in range(10):
result = detector.add_measurement(0, 0.001) # 1ms ± 0
assert not result['needs_retest']
def test_variance_detection(self):
"""Test that high variance triggers detection"""
detector = VarianceDetector()
# Add varying measurements
times = [0.001, 0.001, 0.001, 0.002, 0.003, 0.001, 0.004]
needs_retest = False
for time in times:
result = detector.add_measurement(0, time)
if result['needs_retest']:
needs_retest = True
assert needs_retest
def test_cascade_propagation(self):
"""Test that variance cascades to next layer"""
detector = VarianceDetector(cascade_enabled=True)
# Create variance in layer 0
for time in [0.001, 0.001, 0.003, 0.001, 0.004]:
detector.add_measurement(0, time)
# Add measurements to layer 1
for time in [0.002, 0.002, 0.006, 0.002]:
detector.add_measurement(1, time)
# Trigger variance check
result = detector.add_measurement(0, 0.005)
if result['needs_retest']:
# Should cascade to layer 1 if it's also unstable
assert len(result['cascaded_layers']) >= 0class TestClientServerIntegration:
def test_normal_inference_flow(self):
"""Test complete inference flow"""
# Start server
server = self.start_test_server()
# Run client inference
client = HttpClient('test_config.yaml')
result = client.run_inference_cycle()
assert result is not None
server.stop()
def test_local_inference_mode(self):
"""Test local inference mode forcing"""
# Configure server with 100% local inference
config = {'local_inference_mode': {'enabled': True, 'probability': 1.0}}
server = self.start_test_server(config)
client = HttpClient('test_config.yaml')
response = client.send_inference_request({})
assert response['offloading_layer'] == -1
server.stop()
def test_client_resilience(self):
"""Test client behavior when server is down"""
# No server running
client = HttpClient('test_config.yaml')
# Should not crash and should fall back to local inference
result = client.run_inference_cycle()
assert result is not Nonedef test_performance_benchmark():
"""Benchmark system performance under load"""
import time
import concurrent.futures
def run_inference():
client = HttpClient('test_config.yaml')
start = time.time()
client.run_inference_cycle()
return time.time() - start
# Run 100 concurrent inferences
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(run_inference) for _ in range(100)]
times = [f.result() for f in concurrent.futures.as_completed(futures)]
avg_time = sum(times) / len(times)
print(f"Average inference time: {avg_time:.3f}s")
assert avg_time < 2.0 # Should complete within 2 secondsclass ModelCache:
def __init__(self, max_size: int = 10):
self.cache = {}
self.max_size = max_size
self.access_times = {}
def get_model(self, model_path: str):
"""Get model with LRU caching"""
if model_path in self.cache:
self.access_times[model_path] = time.time()
return self.cache[model_path]
# Load model
interpreter = tf.lite.Interpreter(model_path=model_path)
interpreter.allocate_tensors()
# Evict oldest if cache full
if len(self.cache) >= self.max_size:
oldest = min(self.access_times.keys(), key=lambda k: self.access_times[k])
del self.cache[oldest]
del self.access_times[oldest]
self.cache[model_path] = interpreter
self.access_times[model_path] = time.time()
return interpreterdef setup_tensorflow_config():
"""Optimize TensorFlow configuration"""
import tensorflow as tf
# Configure GPU memory growth
physical_devices = tf.config.list_physical_devices('GPU')
for device in physical_devices:
tf.config.experimental.set_memory_growth(device, True)
# Set threading
tf.config.threading.set_inter_op_parallelism_threads(0) # Use all cores
tf.config.threading.set_intra_op_parallelism_threads(0)class CustomOffloadingAlgorithm:
def calculate_offloading_layer(self, device_times: List[float],
edge_times: List[float],
context: dict) -> int:
"""Implement custom offloading logic"""
# Your algorithm here
return optimal_layer
# Register algorithm
offloading_algorithms['custom'] = CustomOffloadingAlgorithm()class CustomVarianceDetector:
def add_measurement(self, layer_id: int, time: float) -> dict:
"""Custom variance detection logic"""
# Your detection algorithm
return {'needs_retest': False, 'cascaded_layers': []}class CustomProtocolServer:
def __init__(self, config: dict):
self.config = config
def handle_request(self, request: dict) -> dict:
"""Custom protocol handling"""
return response
def start(self):
"""Start custom server"""
pass# production_settings.yaml
communication:
http:
host: "0.0.0.0"
port: 80 # Standard HTTP port
local_inference_mode:
enabled: true
probability: 0.05 # Lower for production
variance_detection:
window_size: 20 # More stable
cv_threshold: 0.10 # More sensitive
delay_simulation:
computation:
enabled: false # Disable in production
network:
enabled: false # Disable in productionimport logging
from logging.handlers import RotatingFileHandler
# Configure production logging
handler = RotatingFileHandler('sciot.log', maxBytes=10**7, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)- Use HTTPS in production
- Implement authentication for inference endpoints
- Validate all input data
- Rate limiting for inference requests
- Monitor for abnormal traffic patterns
This guide provides the technical foundation for understanding and extending the SCIoT system. For usage instructions, see USER_GUIDE.md.