Skip to content

Latest commit

 

History

History
649 lines (511 loc) · 21.9 KB

File metadata and controls

649 lines (511 loc) · 21.9 KB

SCIoT Developer Guide - Technical Implementation

This guide provides detailed technical information for developers working with or extending the SCIoT system.

Architecture Overview

SCIoT implements a distributed split computing architecture with intelligent offloading, variance detection, and client resilience.

┌─────────────┐    ┌──────────────────┐    ┌─────────────┐
│   Device    │◄──►│   Edge Server    │◄──►│ Other Clients│
│             │    │                  │    │             │
│ Layer 0-N   │    │ Layer N+1-58     │    │ Layer 0-M   │
│ Local Inf.  │    │ Variance Detect. │    │ Local Inf.  │
│ Resilience  │    │ Local Inf. Mode  │    │ Resilience  │
└─────────────┘    └──────────────────┘    └─────────────┘

Core Components

1. Variance Detection System

Purpose: Monitor inference time stability and trigger re-evaluation when performance changes significantly.

InferenceTimeHistory Class

class InferenceTimeHistory:
    def __init__(self, layer_id: int, window_size: int = 10):
        self.layer_id = layer_id
        self.window_size = window_size
        self.measurements = deque(maxlen=window_size)
    
    def add_measurement(self, time: float) -> bool:
        """Add measurement and return True if variance threshold exceeded"""
        self.measurements.append(time)
        return not self.is_stable()
    
    def is_stable(self, cv_threshold: float = 0.15) -> bool:
        """Check if coefficient of variation is below threshold"""
        if len(self.measurements) < 3:
            return True
        
        mean_time = statistics.mean(self.measurements)
        std_time = statistics.stdev(self.measurements)
        cv = std_time / mean_time if mean_time > 0 else 0
        
        return cv <= cv_threshold

Key algorithms:

  • Coefficient of Variation (CV): CV = σ/μ - normalized measure independent of scale
  • Sliding Window: Last N measurements (default: 10)
  • Threshold: 15% CV triggers re-evaluation
  • Cascade Propagation: Variance at layer i triggers check at layer i+1

VarianceDetector Class

class VarianceDetector:
    def __init__(self, window_size: int = 10, variance_threshold: float = 0.15):
        self.layer_histories: Dict[int, InferenceTimeHistory] = {}
        self.cascade_enabled = True
    
    def add_measurement(self, layer_id: int, time: float) -> Dict[str, Any]:
        """Add measurement and return detection result"""
        if layer_id not in self.layer_histories:
            self.layer_histories[layer_id] = InferenceTimeHistory(layer_id)
        
        needs_retest = self.layer_histories[layer_id].add_measurement(time)
        
        result = {
            'needs_retest': needs_retest,
            'cascaded_layers': []
        }
        
        # Cascade to next layer if enabled and variance detected
        if needs_retest and self.cascade_enabled:
            next_layer = layer_id + 1
            if next_layer in self.layer_histories:
                next_needs_retest = not self.layer_histories[next_layer].is_stable()
                if next_needs_retest:
                    result['cascaded_layers'].append(next_layer)
        
        return result

2. Local Inference Mode

Purpose: Periodically force device to run all layers locally to refresh measurements and validate performance.

Implementation in RequestHandler

class RequestHandler:
    def __init__(self):
        self.local_inference_config = self._load_local_inference_config()
    
    def handle_inference_request(self, request_data: dict) -> dict:
        """Process inference request with optional local inference forcing"""
        
        # Check if should force local inference
        if self._should_force_local_inference():
            logger.info("📍 LOCAL INFERENCE forced (probability hit)")
            return {
                'offloading_layer': -1,  # Special value for "all local"
                'message': 'Local inference mode'
            }
        
        # Normal offloading algorithm
        offloading_layer = self._calculate_offloading_layer(request_data)
        return {
            'offloading_layer': offloading_layer,
            'message': 'Normal offloading'
        }
    
    def _should_force_local_inference(self) -> bool:
        """Determine if current request should force local inference"""
        if not self.local_inference_config.get('enabled', False):
            return False
        
        probability = self.local_inference_config.get('probability', 0.1)
        return random.random() < probability

Client-Side Handling

def run_split_inference(image, tflite_dir, stop_layer):
    """Run inference with support for local inference mode (-1)"""
    input_data = image
    inference_times = []
    
    # Handle -1 as "run all layers until the end"
    if stop_layer == -1:
        stop_layer = LAST_OFFLOADING_LAYER  # 58 for FOMO 96x96
        print(f"Offloading layer -1: Running all {stop_layer + 1} layers locally")
    
    # Run inference layers
    for i in range(stop_layer + 1):
        model_path = f"{tflite_dir}/model_{i}.tflite"
        start_time = time.perf_counter()
        
        # Load and run layer
        interpreter = tf.lite.Interpreter(model_path=model_path)
        interpreter.allocate_tensors()
        interpreter.set_tensor(interpreter.get_input_details()[0]['index'], input_data)
        interpreter.invoke()
        output_data = interpreter.get_tensor(interpreter.get_output_details()[0]['index'])
        
        end_time = time.perf_counter()
        inference_times.append(end_time - start_time)
        input_data = output_data
    
    return input_data, inference_times

3. Client Resilience System

Purpose: Handle network failures gracefully without crashing or losing data.

HTTP Client Resilience

class HttpClient:
    def __init__(self, config_path: str):
        self.config = self._load_config(config_path)
        self.timeout = self.config['server']['timeout']
        self.base_url = self.config['server']['base_url']
        self.retry_count = 3
        self.retry_delay = 1.0  # Initial delay
        
    def send_inference_request(self, data: dict) -> Optional[dict]:
        """Send request with retry logic and timeout handling"""
        for attempt in range(self.retry_count):
            try:
                response = requests.post(
                    f"{self.base_url}{self.config['server']['inference_endpoint']}",
                    json=data,
                    timeout=self.timeout
                )
                
                if response.status_code == 200:
                    return response.json()
                else:
                    logger.warning(f"Server returned status {response.status_code}")
                    
            except requests.exceptions.Timeout:
                logger.warning(f"Request timeout (attempt {attempt + 1}/{self.retry_count})")
            except requests.exceptions.ConnectionError:
                logger.warning(f"Connection error (attempt {attempt + 1}/{self.retry_count})")
            except Exception as e:
                logger.error(f"Unexpected error: {e}")
            
            # Exponential backoff
            if attempt < self.retry_count - 1:
                time.sleep(self.retry_delay * (2 ** attempt))
        
        logger.error("All retry attempts failed, falling back to local inference")
        return None
    
    def run_inference_cycle(self):
        """Run inference with graceful degradation"""
        image = self._load_image()
        
        # Try server offloading first
        server_response = self.send_inference_request({
            'request_id': str(uuid.uuid4()),
            'timestamp': datetime.now().isoformat()
        })
        
        if server_response:
            # Normal split computing
            offloading_layer = server_response['offloading_layer']
            output, device_times = run_split_inference(image, self.tflite_dir, offloading_layer)
            
            # Send results back
            self._send_results(device_times, offloading_layer)
        else:
            # Fallback to local-only inference
            logger.info("🔄 Falling back to local-only inference")
            output, device_times = run_split_inference(image, self.tflite_dir, LAST_OFFLOADING_LAYER)
            
        return output

4. Model Management

ModelManager Class

class ModelManager:
    def __init__(self, inference_times: dict, 
                 computation_delay_config: dict = None,
                 variance_detector: VarianceDetector = None):
        self.inference_times = inference_times
        self.delay_config = computation_delay_config
        self.variance_detector = variance_detector
        self.loaded_models = {}
    
    def get_inference_time_with_variance_check(self, layer_id: int, measured_time: float) -> float:
        """Get inference time and check for variance"""
        if self.variance_detector:
            result = self.variance_detector.add_measurement(layer_id, measured_time)
            
            if result['needs_retest']:
                logger.warning(f"⚠️ VARIANCE DETECTED at layer {layer_id}")
                
            if result['cascaded_layers']:
                for cascaded_layer in result['cascaded_layers']:
                    logger.info(f"🔄 CASCADE: Variance propagated to layer {cascaded_layer}")
        
        # Apply delay simulation if configured
        if self.delay_config and self.delay_config.get('enabled', False):
            delay = self._generate_delay()
            return measured_time + delay
            
        return measured_time
    
    def _generate_delay(self) -> float:
        """Generate computation delay based on configuration"""
        mean = self.delay_config.get('mean', 0.002)
        std_dev = self.delay_config.get('std_dev', 0.0005)
        return max(0, random.gauss(mean, std_dev))

5. Communication Protocols

HTTP Server (FastAPI)

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

class InferenceRequest(BaseModel):
    request_id: str
    timestamp: str
    device_times: Optional[List[float]] = None

app = FastAPI()

@app.post("/inference")
async def handle_inference(request: InferenceRequest):
    """Handle inference request with variance detection and local inference mode"""
    try:
        handler = RequestHandler()
        response = handler.handle_inference_request(request.dict())
        return response
    except Exception as e:
        logger.error(f"Inference request failed: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/results")
async def handle_results(request: InferenceRequest):
    """Handle inference results from client"""
    if request.device_times:
        # Process device timing measurements
        for i, time in enumerate(request.device_times):
            variance_detector.add_measurement(i, time)
    
    return {"status": "success"}

WebSocket Server

import asyncio
import websockets
import json

class WebsocketServer:
    def __init__(self, host: str, port: int, endpoint: str):
        self.host = host
        self.port = port  
        self.endpoint = endpoint
        self.request_handler = RequestHandler()
    
    async def handle_client(self, websocket, path):
        """Handle WebSocket client connection"""
        try:
            async for message in websocket:
                request_data = json.loads(message)
                response = self.request_handler.handle_inference_request(request_data)
                await websocket.send(json.dumps(response))
                
        except websockets.exceptions.ConnectionClosed:
            logger.info("Client disconnected")
        except Exception as e:
            logger.error(f"WebSocket error: {e}")
    
    def run(self):
        """Start WebSocket server"""
        start_server = websockets.serve(self.handle_client, self.host, self.port)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()

Algorithm Details

1. Offloading Decision Algorithm

Objective: Minimize total inference time T_total = T_device + T_network + T_edge

def calculate_optimal_offloading_layer(device_times: List[float], 
                                     edge_times: List[float],
                                     network_latency: float) -> int:
    """Calculate optimal offloading point using dynamic programming"""
    
    min_total_time = float('inf')
    optimal_layer = len(device_times) - 1  # Default: all local
    
    for split_point in range(len(device_times)):
        # Calculate device time (layers 0 to split_point)
        device_time = sum(device_times[:split_point + 1])
        
        # Calculate edge time (layers split_point+1 to end)
        edge_time = sum(edge_times[split_point + 1:])
        
        # Total time includes network latency
        total_time = device_time + network_latency + edge_time
        
        if total_time < min_total_time:
            min_total_time = total_time
            optimal_layer = split_point
    
    return optimal_layer

2. Exponential Moving Average (EMA) Time Smoothing

Purpose: Smooth timing measurements to reduce noise.

class EMATimer:
    def __init__(self, alpha: float = 0.2):
        self.alpha = alpha  # Smoothing factor
        self.ema_value = None
    
    def update(self, new_value: float) -> float:
        """Update EMA with new measurement"""
        if self.ema_value is None:
            self.ema_value = new_value
        else:
            self.ema_value = self.alpha * new_value + (1 - self.alpha) * self.ema_value
        
        return self.ema_value

3. Network Latency Estimation

class NetworkLatencyEstimator:
    def __init__(self):
        self.ema_timer = EMATimer(alpha=0.3)
        self.last_measurements = deque(maxlen=5)
    
    def measure_latency(self, start_time: float, end_time: float) -> float:
        """Measure and update network latency estimate"""
        latency = end_time - start_time
        self.last_measurements.append(latency)
        return self.ema_timer.update(latency)
    
    def get_estimated_latency(self) -> float:
        """Get current latency estimate"""
        if self.ema_timer.ema_value is None:
            return 0.050  # Default: 50ms
        return self.ema_timer.ema_value

Testing Framework

1. Unit Tests

import pytest
from src.server.variance_detector import VarianceDetector

class TestVarianceDetector:
    def test_stable_measurements(self):
        """Test that stable measurements don't trigger variance"""
        detector = VarianceDetector()
        
        # Add stable measurements
        for _ in range(10):
            result = detector.add_measurement(0, 0.001)  # 1ms ± 0
            assert not result['needs_retest']
    
    def test_variance_detection(self):
        """Test that high variance triggers detection"""
        detector = VarianceDetector()
        
        # Add varying measurements  
        times = [0.001, 0.001, 0.001, 0.002, 0.003, 0.001, 0.004]
        needs_retest = False
        
        for time in times:
            result = detector.add_measurement(0, time)
            if result['needs_retest']:
                needs_retest = True
                
        assert needs_retest
    
    def test_cascade_propagation(self):
        """Test that variance cascades to next layer"""
        detector = VarianceDetector(cascade_enabled=True)
        
        # Create variance in layer 0
        for time in [0.001, 0.001, 0.003, 0.001, 0.004]:
            detector.add_measurement(0, time)
            
        # Add measurements to layer 1
        for time in [0.002, 0.002, 0.006, 0.002]:
            detector.add_measurement(1, time)
            
        # Trigger variance check
        result = detector.add_measurement(0, 0.005)
        
        if result['needs_retest']:
            # Should cascade to layer 1 if it's also unstable
            assert len(result['cascaded_layers']) >= 0

2. Integration Tests

class TestClientServerIntegration:
    def test_normal_inference_flow(self):
        """Test complete inference flow"""
        # Start server
        server = self.start_test_server()
        
        # Run client inference
        client = HttpClient('test_config.yaml')
        result = client.run_inference_cycle()
        
        assert result is not None
        server.stop()
    
    def test_local_inference_mode(self):
        """Test local inference mode forcing"""
        # Configure server with 100% local inference
        config = {'local_inference_mode': {'enabled': True, 'probability': 1.0}}
        server = self.start_test_server(config)
        
        client = HttpClient('test_config.yaml')
        response = client.send_inference_request({})
        
        assert response['offloading_layer'] == -1
        server.stop()
    
    def test_client_resilience(self):
        """Test client behavior when server is down"""
        # No server running
        client = HttpClient('test_config.yaml')
        
        # Should not crash and should fall back to local inference
        result = client.run_inference_cycle()
        assert result is not None

3. Performance Tests

def test_performance_benchmark():
    """Benchmark system performance under load"""
    import time
    import concurrent.futures
    
    def run_inference():
        client = HttpClient('test_config.yaml')
        start = time.time()
        client.run_inference_cycle()
        return time.time() - start
    
    # Run 100 concurrent inferences
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(run_inference) for _ in range(100)]
        times = [f.result() for f in concurrent.futures.as_completed(futures)]
    
    avg_time = sum(times) / len(times)
    print(f"Average inference time: {avg_time:.3f}s")
    assert avg_time < 2.0  # Should complete within 2 seconds

Performance Optimization

1. Model Loading Optimization

class ModelCache:
    def __init__(self, max_size: int = 10):
        self.cache = {}
        self.max_size = max_size
        self.access_times = {}
    
    def get_model(self, model_path: str):
        """Get model with LRU caching"""
        if model_path in self.cache:
            self.access_times[model_path] = time.time()
            return self.cache[model_path]
        
        # Load model
        interpreter = tf.lite.Interpreter(model_path=model_path)
        interpreter.allocate_tensors()
        
        # Evict oldest if cache full
        if len(self.cache) >= self.max_size:
            oldest = min(self.access_times.keys(), key=lambda k: self.access_times[k])
            del self.cache[oldest]
            del self.access_times[oldest]
        
        self.cache[model_path] = interpreter
        self.access_times[model_path] = time.time()
        return interpreter

2. Memory Management

def setup_tensorflow_config():
    """Optimize TensorFlow configuration"""
    import tensorflow as tf
    
    # Configure GPU memory growth
    physical_devices = tf.config.list_physical_devices('GPU')
    for device in physical_devices:
        tf.config.experimental.set_memory_growth(device, True)
    
    # Set threading
    tf.config.threading.set_inter_op_parallelism_threads(0)  # Use all cores
    tf.config.threading.set_intra_op_parallelism_threads(0)

Extension Points

1. Custom Offloading Algorithms

class CustomOffloadingAlgorithm:
    def calculate_offloading_layer(self, device_times: List[float],
                                 edge_times: List[float],
                                 context: dict) -> int:
        """Implement custom offloading logic"""
        # Your algorithm here
        return optimal_layer
        
# Register algorithm
offloading_algorithms['custom'] = CustomOffloadingAlgorithm()

2. Custom Variance Detectors

class CustomVarianceDetector:
    def add_measurement(self, layer_id: int, time: float) -> dict:
        """Custom variance detection logic"""
        # Your detection algorithm
        return {'needs_retest': False, 'cascaded_layers': []}

3. Communication Protocol Extensions

class CustomProtocolServer:
    def __init__(self, config: dict):
        self.config = config
    
    def handle_request(self, request: dict) -> dict:
        """Custom protocol handling"""
        return response
        
    def start(self):
        """Start custom server"""
        pass

Deployment Considerations

1. Production Configuration

# production_settings.yaml
communication:
  http:
    host: "0.0.0.0"
    port: 80  # Standard HTTP port
    
local_inference_mode:
  enabled: true
  probability: 0.05  # Lower for production
  
variance_detection:
  window_size: 20     # More stable
  cv_threshold: 0.10  # More sensitive
  
delay_simulation:
  computation:
    enabled: false  # Disable in production
  network:
    enabled: false  # Disable in production

2. Monitoring and Logging

import logging
from logging.handlers import RotatingFileHandler

# Configure production logging
handler = RotatingFileHandler('sciot.log', maxBytes=10**7, backupCount=5)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)

3. Security Considerations

  • Use HTTPS in production
  • Implement authentication for inference endpoints
  • Validate all input data
  • Rate limiting for inference requests
  • Monitor for abnormal traffic patterns

This guide provides the technical foundation for understanding and extending the SCIoT system. For usage instructions, see USER_GUIDE.md.