Skip to content

TheJisus28/cronjob-polling

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Polling System Architecture

Complete guide to understanding the polling system, its components, and data flow.


Table of Contents

  1. Overview
  2. System Architecture
  3. Components
  4. Data Flow
  5. How It Works
  6. Configuration
  7. Monitoring

Overview

The polling system is responsible for automatically monitoring HTTP endpoints at configured intervals. It consists of three main components working together:

  • Coordinator - Orchestrates the entire system
  • Scheduler - Manages timing and project monitoring
  • Executor - Executes HTTP requests with controlled concurrency
graph TB
    A[Project Created in DB] --> B[Coordinator]
    B --> C[Scheduler]
    B --> D[Executor Pool]
    B --> E[Result Processor]

    C -->|Tasks| D
    D -->|Results| E
    E -->|Save| F[(Database)]

    style B fill:#4CAF50
    style C fill:#2196F3
    style D fill:#FF9800
    style E fill:#9C27B0
Loading

System Architecture

High-Level Component Diagram

graph LR
    subgraph "Polling Coordinator"
        A[Scheduler] -->|ProjectTask| B[Task Queue]
        B --> C[Executor Pool]
        C -->|PollingResult| D[Result Queue]
        D --> E[Result Processor]
    end

    F[(PostgreSQL)] -.->|Load Projects| A
    E -->|Save Results| F

    style A fill:#64B5F6
    style C fill:#FFB74D
    style E fill:#BA68C8
    style F fill:#81C784
Loading

Component Responsibilities

Component Responsibility Stateful?
Coordinator Orchestrates all components, manages lifecycle Yes (in-memory state)
Scheduler Maintains project tickers, emits tasks Yes (project map)
Executor Executes HTTP requests with concurrency control No (stateless)
Result Processor Consumes results and saves to DB No (stateless)

Components

1. Coordinator

File: poller.go

The main orchestrator that ties everything together.

graph TD
    A[Coordinator] --> B[Start]
    B --> C[Initialize Executor]
    B --> D[Start Scheduler]
    B --> E[Start Result Processor]
    B --> F[Start Project Reloader]

    A --> G[Stop]
    G --> H[Signal Shutdown]
    H --> I[Stop Scheduler]
    H --> J[Stop Executor]
    H --> K[Wait for Goroutines]

    style A fill:#4CAF50
    style B fill:#2196F3
    style G fill:#F44336
Loading

Key Methods:

// Create coordinator
coordinator := NewCoordinator(projectRepo, pollResultRepo, maxConcurrentHTTP)

// Start the system
coordinator.Start()

// Manage projects
coordinator.AddProject(project)
coordinator.RemoveProject(projectID)
coordinator.ReloadProjects()

// Get status
status := coordinator.GetStatus()

// Stop the system
coordinator.Stop()

Background Goroutines:

  1. processResults() - Consumes results from resultQueue and saves to database
  2. reloadProjectsPeriodically() - Reloads all projects every 5 minutes

2. Scheduler

File: scheduler.go

Manages the timing of polling tasks. Creates a ticker for each active project.

graph TD
    A[Scheduler] --> B[Project Map]
    B --> C[Project 1 - Ticker 5min]
    B --> D[Project 2 - Ticker 1min]
    B --> E[Project 3 - Ticker 30sec]

    C -->|Tick| F[Create Task]
    D -->|Tick| F
    E -->|Tick| F

    F --> G[Task Queue]

    style A fill:#64B5F6
    style B fill:#FFE082
    style G fill:#A5D6A7
Loading

How It Works:

// For each project
duration := project.GetDuration() // e.g., 5 minutes
ticker := time.NewTicker(duration)

go func() {
    for {
        select {
        case <-ticker.C:
            // Time to poll!
            task := ProjectTask{Project: project}
            taskQueue <- task
        }
    }
}()

Project Lifecycle:

sequenceDiagram
    participant User
    participant API
    participant Scheduler
    participant TQ as Task Queue

    User->>API: Create Project (enabled=true)
    API->>Scheduler: AddProject(project)
    Scheduler->>Scheduler: Create Ticker

    loop Every interval
        Scheduler->>TQ: Publish ProjectTask
    end

    User->>API: Disable Project
    API->>Scheduler: RemoveProject(id)
    Scheduler->>Scheduler: Stop Ticker
Loading

3. Executor Pool

File: executor.go

Executes HTTP polling tasks using semaphore-based concurrency control.

graph TD
    A[Task Queue] --> B{Task Consumer}
    B -->|Create| C[Goroutine 1]
    B -->|Create| D[Goroutine 2]
    B -->|Create| E[Goroutine N]

    C --> F{HTTP Semaphore}
    D --> F
    E --> F

    F -->|Slot Available| G[HTTP Request 1]
    F -->|Slot Available| H[HTTP Request 2]
    F -->|Wait...| I[Waiting...]

    G --> J[Release Slot]
    H --> J
    J --> K[Result Queue]

    style F fill:#FF9800
    style G fill:#4CAF50
    style H fill:#4CAF50
    style I fill:#FFC107
Loading

Concurrency Model:

Task arrives
    ↓
Create lightweight goroutine (~2KB)
    ↓
Wait on HTTP semaphore (may block)
    ↓
Acquire slot → Execute HTTP request
    ↓
Release slot → Send result → Exit goroutine

Key Characteristics:

  • Unlimited goroutines - One per task, minimal overhead
  • Limited HTTP - Controlled by semaphore (e.g., 500 concurrent)
  • No task drops - All tasks eventually execute
  • Natural backpressure - Slow tasks automatically queue

Metrics:

stats := executor.GetStats()

// ActiveTasks: Total goroutines (waiting + executing)
// ActiveHTTP: HTTP requests currently in progress
// TotalExecuted: All-time counter
// MaxConcurrentHTTP: Configured limit

4. Result Processor

File: poller.go (processResults method)

Consumes polling results and saves them to the database.

graph LR
    A[Result Queue] --> B{Result Processor}
    B -->|Success| C[Update Stats]
    B -->|Failure| C
    C --> D[Save to DB]
    D --> E[(poll_results)]
    C --> F[Log Result]

    style B fill:#BA68C8
    style E fill:#81C784
Loading

What It Does:

for result := range resultQueue {
    // 1. Update statistics
    totalRequests++
    if result.Success {
        successRequests++
    } else {
        failedRequests++
    }

    // 2. Save to database
    pollResultRepo.Create(result)

    // 3. Log result
    log.Printf("Project %d polled: %v", result.ProjectID, result.Success)
}

Data Flow

Complete System Flow

sequenceDiagram
    participant DB as PostgreSQL
    participant S as Scheduler
    participant TQ as Task Queue
    participant E as Executor
    participant RQ as Result Queue
    participant RP as Result Processor

    Note over DB,S: 1. System Startup
    S->>DB: Load enabled projects
    DB-->>S: [Project 1, Project 2, ...]
    S->>S: Create ticker for each project

    Note over S,TQ: 2. Task Generation
    loop Every interval
        S->>TQ: ProjectTask{project}
    end

    Note over TQ,E: 3. Task Execution
    TQ->>E: Consume task
    E->>E: Create goroutine
    E->>E: Wait on HTTP semaphore
    E->>E: Execute HTTP request
    E->>RQ: PollingResult

    Note over RQ,DB: 4. Result Processing
    RQ->>RP: Consume result
    RP->>RP: Update statistics
    RP->>DB: Save result
Loading

Detailed Execution Flow

flowchart TD
    A[Project ticker fires] --> B[Create ProjectTask]
    B --> C{Task Queue full?}
    C -->|No| D[Enqueue task]
    C -->|Yes| E[Block until space]
    E --> D

    D --> F[Task Consumer receives]
    F --> G[Create goroutine]
    G --> H{HTTP Semaphore slots?}
    H -->|Available| I[Acquire slot]
    H -->|Full| J[Wait for slot]
    J --> I

    I --> K[Execute HTTP GET/POST]
    K --> L{Response received?}
    L -->|Yes| M[Status code & time]
    L -->|No/Timeout| N[Error object]

    M --> O[Release semaphore]
    N --> O
    O --> P[Create PollingResult]
    P --> Q{Result Queue full?}
    Q -->|No| R[Enqueue result]
    Q -->|Yes| S[Drop result + log warning]

    R --> T[Result Processor]
    T --> U[Update stats]
    U --> V[Save to poll_results table]
    V --> W[Log result]
    W --> X[Done]

    style I fill:#4CAF50
    style K fill:#2196F3
    style O fill:#FF9800
    style V fill:#9C27B0
Loading

How It Works

Startup Sequence

sequenceDiagram
    participant M as main()
    participant C as Coordinator
    participant E as Executor
    participant S as Scheduler
    participant DB as Database

    M->>C: NewCoordinator(repos, 500)
    C->>E: NewExecutor(maxConcurrent=500)
    C->>S: NewScheduler(projectRepo)

    M->>C: Start()
    C->>E: Start()
    Note over E: Starts task consumer goroutine

    C->>S: Run()
    S->>DB: GetEnabled()
    DB-->>S: [enabled projects]
    S->>S: Create ticker for each
    Note over S: Background goroutines monitoring

    C->>C: Start background processors
    Note over C: - Result processor<br/>- Project reloader

    C-->>M: System ready ✅
Loading

Runtime Operation

stateDiagram-v2
    [*] --> Idle

    Idle --> TaskGenerated: Ticker fires
    TaskGenerated --> TaskQueued: Enqueue to channel
    TaskQueued --> GoroutineCreated: Consumer picks up

    GoroutineCreated --> WaitingSemaphore: Wait for HTTP slot
    WaitingSemaphore --> ExecutingHTTP: Slot acquired

    ExecutingHTTP --> HTTPSuccess: Got response
    ExecutingHTTP --> HTTPFailure: Timeout/Error

    HTTPSuccess --> ResultQueued: Create result
    HTTPFailure --> ResultQueued: Create result

    ResultQueued --> ResultProcessed: Processor handles
    ResultProcessed --> SavedToDB: Insert poll_result
    SavedToDB --> Idle: Complete
Loading

Project Monitoring Lifecycle

graph TD
    A[Project created with enabled=true] --> B[Coordinator.AddProject]
    B --> C[Scheduler adds to active map]
    C --> D[Create time.Ticker]
    D --> E[Start monitoring goroutine]

    E --> F{Ticker fires}
    F -->|Every interval| G[Emit ProjectTask]
    G --> H[Task Queue]
    H --> I[Executor processes]
    I --> J[Result saved]
    J --> F

    K[User disables project] --> L[Coordinator.RemoveProject]
    L --> M[Scheduler removes from map]
    M --> N[Stop ticker]
    N --> O[Stop goroutine]
    O --> P[Monitoring stopped]

    style A fill:#4CAF50
    style K fill:#F44336
    style E fill:#2196F3
    style I fill:#FF9800
Loading

Components Deep Dive

Coordinator (poller.go)

The central orchestrator of the polling system.

classDiagram
    class Coordinator {
        -scheduler Scheduler
        -executor ExecutorPool
        -projectRepo ProjectRepository
        -pollResultRepo PollResultRepository
        -resultQueue chan
        -maxConcurrentHTTP int

        +Start() error
        +Stop() error
        +AddProject(project) error
        +RemoveProject(id) error
        +ReloadProjects() error
        +GetStatus() Status

        -processResults()
        -reloadProjectsPeriodically()
    }

    Coordinator --> Scheduler
    Coordinator --> ExecutorPool
    Coordinator --> ProjectRepository
    Coordinator --> PollResultRepository
Loading

Responsibilities:

  1. Lifecycle Management

    • Start/stop all components
    • Graceful shutdown coordination
  2. Result Processing

    • Consume results from result queue
    • Update statistics (success/failure counts)
    • Save to database
  3. Project Reloading

    • Periodically reload projects from DB (every 5 min)
    • Ensures new projects are picked up
    • Syncs changes made directly to DB
  4. Public API

    • Interface for adding/removing projects
    • Get system status

Scheduler (scheduler.go)

Manages the timing and scheduling of polling tasks.

graph TD
    A[Scheduler] --> B[Active Projects Map]
    B --> C[Project 1]
    B --> D[Project 2]
    B --> E[Project N]

    C --> C1[Ticker: 5min]
    D --> D1[Ticker: 1min]
    E --> E1[Ticker: 30sec]

    C1 -->|Tick| F[ProjectTask]
    D1 -->|Tick| F
    E1 -->|Tick| F

    F --> G[Task Queue Channel]

    style A fill:#64B5F6
    style B fill:#FFE082
    style G fill:#A5D6A7
Loading

Internal Structure:

type Scheduler struct {
    activeProjects map[uint]*ProjectMonitor  // Project ID → Monitor
    taskQueue      chan ProjectTask
    projectRepo    ProjectRepository
}

type ProjectMonitor struct {
    Project *domain.Project
    Ticker  *time.Ticker
    StopCh  chan struct{}
}

How Projects are Monitored:

sequenceDiagram
    participant S as Scheduler
    participant PM as ProjectMonitor
    participant T as Ticker
    participant Q as Task Queue

    Note over S: AddProject(project)
    S->>PM: Create monitor
    S->>T: NewTicker(interval)
    S->>PM: Start goroutine

    loop Monitoring active
        T->>PM: Tick!
        PM->>Q: Send ProjectTask
    end

    Note over S: RemoveProject(id)
    S->>PM: Send stop signal
    PM->>T: Stop ticker
    PM->>PM: Exit goroutine
Loading

Key Operations:

  • AddProject - Adds/updates a project for monitoring
  • RemoveProject - Stops monitoring a project
  • ReloadProjects - Reloads all enabled projects from DB

Executor Pool (executor.go)

Executes HTTP requests with semaphore-based concurrency control.

graph TB
    subgraph "Executor Pool"
        A[Task Queue] --> B[Task Consumer]

        B -->|Create| C[Goroutine 1]
        B -->|Create| D[Goroutine 2]
        B -->|Create| E[Goroutine 3]
        B -->|Create| F[Goroutine ...]

        C --> G{HTTP Semaphore<br/>500 slots}
        D --> G
        E --> G
        F --> G

        G -->|Slot 1| H[HTTP Request]
        G -->|Slot 2| I[HTTP Request]
        G -->|Slot 500| J[HTTP Request]
        G -->|Wait...| K[Queued Goroutines]

        H --> L[Result]
        I --> L
        J --> L

        L --> M[Result Queue]
    end

    style G fill:#FF9800
    style H fill:#4CAF50
    style I fill:#4CAF50
    style J fill:#4CAF50
    style K fill:#FFC107
Loading

Semaphore Pattern:

// Semaphore controls HTTP concurrency
httpSemaphore := make(chan struct{}, 500)

// For each task
go func(task ProjectTask) {
    // 1. Acquire semaphore slot (blocks if full)
    httpSemaphore <- struct{}{}
    defer func() { <-httpSemaphore }() // Release on exit

    // 2. Execute HTTP request (only 500 concurrent)
    statusCode, err := doHTTPRequest(task)

    // 3. Create result
    result := PollingResult{...}
    resultQueue <- result
}(task)

// Goroutines are CHEAP (~2KB)
// HTTP connections are EXPENSIVE (~100KB + file descriptor)

Resource Comparison:

30,000 projects with fixed workers:
- 30,000 goroutines × 100KB = 3GB RAM ❌

30,000 projects with semaphore:
- 30,000 goroutines × 2KB = 60MB
- 500 HTTP active × 100KB = 50MB
- Total: ~110MB RAM ✅

HTTP Requester (http_requester.go)

Abstraction for making HTTP requests.

graph LR
    A[HTTPRequester] --> B[DoRequest]
    B --> C{Method?}
    C -->|GET| D[http.Get]
    C -->|POST| E[http.Post]
    C -->|HEAD| F[http.Head]
    C -->|Other| G[http.NewRequest]

    D --> H{Status code?}
    E --> H
    F --> H
    G --> H

    H -->|== Expected| I[Success]
    H -->|!= Expected| J[Failure]

    style A fill:#81C784
    style I fill:#4CAF50
    style J fill:#F44336
Loading

Validation:

// Request is considered successful if:
1. No network error
2. Status code == project.ExpectedStatus

// Examples:
Project expects 200Gets 200Project expects 200Gets 404Project expects 404Gets 404

Data Flow

Complete Polling Cycle

flowchart TD
    Start([Project enabled in DB]) --> A[Scheduler loads project]
    A --> B[Create ticker with interval]
    B --> C{Wait for tick}

    C -->|Interval elapsed| D[Create ProjectTask]
    D --> E[Send to taskQueue]
    E --> F[Task consumer receives]

    F --> G[Spawn goroutine]
    G --> H{Semaphore available?}
    H -->|Yes| I[Acquire slot]
    H -->|No| J[Wait in queue]
    J --> I

    I --> K[Execute HTTP request]
    K --> L{Network call}
    L -->|Success| M[Got status code]
    L -->|Error| N[Got error]

    M --> O{Status == Expected?}
    O -->|Yes| P[Success=true]
    O -->|No| Q[Success=false]
    N --> Q

    P --> R[Create PollingResult]
    Q --> R

    R --> S[Release semaphore slot]
    S --> T[Send to resultQueue]
    T --> U[Result processor receives]

    U --> V[Update coordinator stats]
    V --> W[Save to poll_results table]
    W --> X[Log result]
    X --> C

    style K fill:#2196F3
    style P fill:#4CAF50
    style Q fill:#F44336
    style W fill:#9C27B0
Loading

Data Structures

classDiagram
    class ProjectTask {
        +Project *domain.Project
    }

    class PollingResult {
        +ProjectID uint
        +Success bool
        +StatusCode int
        +ResponseTime Duration
        +Error error
        +Timestamp Time
    }

    class PollResultData {
        +ProjectID uint
        +Status string
        +StatusCode *int
        +ResponseTimeMs *int
        +ErrorMessage string
        +PolledAt Time
    }

    ProjectTask --> PollingResult : Executor transforms
    PollingResult --> PollResultData : Processor transforms
Loading

Channel Communication

graph LR
    A[Scheduler] -->|ProjectTask| B((Task Queue))
    B -->|Buffer: 1000| C[Executor]
    C -->|PollingResult| D((Result Queue))
    D -->|Buffer: 2000| E[Result Processor]
    E --> F[(Database)]

    style B fill:#FFE082
    style D fill:#FFE082
Loading

Buffer Sizes:

  • Task Queue: 1000 - Handles bursts of tasks from scheduler
  • Result Queue: 2000 - Handles bursts of results from executor

Why these sizes?

  • Allows system to handle spikes without blocking
  • Large enough for 1-2 minutes of polling under load
  • Small enough to not waste memory

How It Works - Step by Step

1. System Initialization

sequenceDiagram
    participant Main
    participant Coord as Coordinator
    participant Exec as Executor
    participant Sched as Scheduler
    participant DB

    Main->>Coord: NewCoordinator(repos, 500)
    Note over Coord: Creates channels<br/>Creates components

    Main->>Coord: Start()
    Coord->>Exec: Start()
    Note over Exec: Launches task consumer

    Coord->>Sched: Run()
    Sched->>DB: GetEnabled()
    DB-->>Sched: [Projects]

    loop For each project
        Sched->>Sched: AddProject(project)
        Note over Sched: Creates ticker<br/>Starts goroutine
    end

    Coord->>Coord: Start processResults()
    Coord->>Coord: Start reloadProjectsPeriodically()

    Coord-->>Main: ✅ System running
Loading

2. Adding a New Project

sequenceDiagram
    participant User
    participant API
    participant ProjectSvc
    participant Coord
    participant Sched

    User->>API: POST /api/projects
    API->>ProjectSvc: CreateProject(...)
    ProjectSvc->>ProjectSvc: Validate data
    ProjectSvc->>ProjectSvc: Save to DB

    alt Project is enabled
        ProjectSvc->>Coord: AddProject(project)
        Coord->>Sched: AddProject(project)
        Sched->>Sched: Create ticker
        Sched->>Sched: Start monitoring
        Note over Sched: Polling begins immediately
    end

    ProjectSvc-->>API: Project created
    API-->>User: 201 Created
Loading

3. Polling Execution

sequenceDiagram
    participant T as Ticker
    participant S as Scheduler
    participant Q as Task Queue
    participant E as Executor
    participant Sem as Semaphore
    participant HTTP as HTTP Server
    participant RQ as Result Queue

    T->>S: Tick! (5 minutes elapsed)
    S->>Q: ProjectTask{project}

    Q->>E: Task consumed
    E->>E: Create goroutine

    par Goroutine execution
        E->>Sem: Acquire slot

        alt Slot available
            Sem-->>E: Acquired ✅
            E->>HTTP: GET /endpoint
            HTTP-->>E: 200 OK (350ms)
            E->>Sem: Release slot
        else No slots
            Sem-->>E: Waiting...
            Note over E: Goroutine blocks here
            Sem-->>E: Acquired ✅ (when available)
            E->>HTTP: GET /endpoint
            HTTP-->>E: 503 Error
            E->>Sem: Release slot
        end

        E->>RQ: PollingResult
    end
Loading

4. Result Processing

flowchart LR
    A[Result Queue] --> B{Result Processor}

    B --> C{Success?}
    C -->|Yes| D[successRequests++]
    C -->|No| E[failedRequests++]

    D --> F[totalRequests++]
    E --> F

    F --> G[Build PollResultData]
    G --> H{Save to DB}
    H -->|Success| I[Log: ✅ Project polled]
    H -->|Error| J[Log: ⚠️ Failed to save]

    I --> K[Result processed]
    J --> K

    style D fill:#4CAF50
    style E fill:#F44336
    style H fill:#9C27B0
Loading

Configuration

Environment Variables

# Polling system configuration
MAX_CONCURRENT_HTTP=500    # Max concurrent HTTP requests

# Recommendations by environment:

# Development (local machine)
MAX_CONCURRENT_HTTP=50

# Free tier (512MB RAM)
MAX_CONCURRENT_HTTP=100

# Small EC2 (2GB RAM)
MAX_CONCURRENT_HTTP=500

# Medium EC2 (4GB RAM)
MAX_CONCURRENT_HTTP=1000

# Large EC2 (8GB+ RAM)
MAX_CONCURRENT_HTTP=2000

Coordinator Configuration

coordinator := NewCoordinator(
    projectRepo,
    pollResultRepo,
    500, // maxConcurrentHTTP
)

// Internal settings:
reloadInterval:  5 * time.Minute  // How often to reload projects from DB
taskQueue:       buffer 1000       // Task queue buffer size
resultQueue:     buffer 2000       // Result queue buffer size

Capacity Planning

Formula: Projects = (MaxConcurrentHTTP × 60 × Interval) / AvgResponseTime

Example with MaxConcurrentHTTP=500:
- Interval: 5 minutes
- Avg response time: 3 seconds

Projects = (500 × 60 × 5) / 3
         = 150,000 / 3
         = 50,000 task slots per interval
         ≈ 10,000 projects (with some margin)

Monitoring

System Status

status := coordinator.GetStatus()

fmt.Printf("Active Projects: %d\n", status.ActiveProjects)
fmt.Printf("Total Requests: %d\n", status.TotalRequests)
fmt.Printf("Success: %d\n", status.SuccessRequests)
fmt.Printf("Failed: %d\n", status.FailedRequests)
fmt.Printf("Active HTTP: %d\n", status.Workers)

Health Indicators

graph TD
    A{System Health} --> B{Active HTTP}
    B -->|< 50% max| C[✅ Healthy - Under capacity]
    B -->|50-90% max| D[⚠️ Warning - High load]
    B -->|> 90% max| E[🔴 Critical - Saturated]

    A --> F{Queue Sizes}
    F -->|< 100| G[✅ Healthy]
    F -->|100-500| H[⚠️ Warning]
    F -->|> 500| I[🔴 Critical - Backlog]

    style C fill:#4CAF50
    style D fill:#FF9800
    style E fill:#F44336
    style G fill:#4CAF50
    style H fill:#FF9800
    style I fill:#F44336
Loading

Logging

The system logs all important events:

🚀 Starting Polling Coordinator
🚀 Starting ExecutorPool with max 500 concurrent HTTP requests
   (Goroutines created on-demand, no fixed worker limit)
✅ ExecutorPool started successfully
🚀 Starting Scheduler
🔄 Reloading 3 enabled projects
🚀 Started monitoring project 1 (Project Name) every 5m0s
✅ Scheduler started with 3 active monitors
✅ Polling Coordinator started successfully
📊 Result processor started
🔄 Periodic project reloader started (every 5m0s)

Making GET request to https://example.com (Project Name)
✅ Successful request to https://example.com - Status: 200 - Size: 1234 bytes
✅ Project 1 polled successfully in 234ms

🛑 Stopping Polling Coordinator...
📊 Task consumer shutting down
✅ ExecutorPool stopped (executed 1234 total tasks)
✅ Polling Coordinator stopped

Advanced Topics

Backpressure Handling

graph TD
    A[Tasks arriving] --> B{Task queue full?}
    B -->|No| C[Enqueue task]
    B -->|Yes| D[Scheduler blocks]
    D --> E[Natural backpressure]
    E --> F[Ticker delays]
    F --> G[System self-regulates]

    C --> H[Executor consumes]
    H --> I{Semaphore full?}
    I -->|Yes| J[Goroutine waits]
    I -->|No| K[Execute HTTP]

    J --> L[Automatic queuing]
    L --> M[Processed when slot available]

    style E fill:#FF9800
    style L fill:#4CAF50
Loading

What happens under extreme load:

  1. Tasks arrive faster than processing
  2. Task queue fills up (1000 buffer)
  3. Scheduler blocks on channel send
  4. Ticker delay naturally (can't send new task until old is queued)
  5. System reaches equilibrium

No crashes, no panics, just slower polling.

Graceful Shutdown

sequenceDiagram
    participant Main
    participant Coord
    participant Sched
    participant Exec
    participant Goroutines
    participant DB

    Main->>Coord: Stop()
    Coord->>Coord: cancel context

    Coord->>Sched: Shutdown()
    Sched->>Sched: Stop all tickers
    Sched->>Sched: Stop all monitors
    Sched-->>Coord: Stopped ✅

    Coord->>Exec: Shutdown()
    Exec->>Goroutines: Context canceled
    Note over Goroutines: Finish current HTTP<br/>requests (up to 30s)
    Goroutines-->>Exec: All complete
    Exec-->>Coord: Stopped ✅

    Coord->>Coord: Close result queue
    Coord->>Coord: Wait for processors

    Note over Coord: Process remaining results
    Coord->>DB: Save final results

    Coord-->>Main: Shutdown complete ✅
Loading

Guarantees:

  • ✅ All in-flight HTTP requests complete
  • ✅ All results are processed and saved
  • ✅ No data loss
  • ⚠️ May take up to 30-60 seconds (HTTP timeout)

Performance Characteristics

Latency Breakdown

Component                    Latency
──────────────────────────────────────
Ticker fires                 ~10µs
Create ProjectTask           ~1µs
Enqueue to taskQueue         ~1µs
Task consumer picks up       ~10µs
Create goroutine             ~10µs
Wait on semaphore            0 - 30s (depends on load)
HTTP request                 50ms - 10s (depends on target)
Create PollingResult         ~1µs
Enqueue to resultQueue       ~1µs
Result processor picks up    ~10µs
Save to database            ~1-10ms

Bottlenecks:

  1. HTTP request time - Depends on target server
  2. Semaphore wait time - Depends on concurrent load
  3. Database insert - At 10,000 inserts/minute scale

Throughput

With MAX_CONCURRENT_HTTP=500:

Best case (fast responses, 100ms avg):
= 500 / 0.1s = 5,000 requests/second
= 300,000 requests/minute
= 18M requests/hour

Typical case (normal responses, 3s avg):
= 500 / 3s = 166 requests/second
= 10,000 requests/minute
= 600,000 requests/hour

Worst case (slow responses, 10s avg):
= 500 / 10s = 50 requests/second
= 3,000 requests/minute
= 180,000 requests/hour

Troubleshooting

Common Issues

1. Tasks not executing

graph TD
    A{Tasks not executing} --> B{Check scheduler}
    B -->|Not running| C[Start coordinator]
    B -->|Running| D{Check projects}

    D -->|No projects| E[Add enabled projects]
    D -->|Have projects| F{Check executor}

    F -->|Semaphore full| G[All slots in use<br/>Increase MAX_CONCURRENT_HTTP]
    F -->|Task queue full| H[Executor too slow<br/>Increase concurrency]

    style G fill:#FF9800
    style H fill:#F44336
Loading

2. High memory usage

Check: Number of goroutines

If > 100,000 goroutines:
→ Tasks arriving much faster than processing
→ Either increase MAX_CONCURRENT_HTTP
→ Or reduce polling frequency

If < 10,000 goroutines but high RAM:
→ Memory leak elsewhere (not executor)

3. Slow polling

Check: ActiveHTTP vs MaxConcurrentHTTP

If ActiveHTTP == MaxConcurrentHTTP always:
→ System saturated
→ Tasks waiting on semaphore
→ Solution: Increase MAX_CONCURRENT_HTTP

If ActiveHTTP < MaxConcurrentHTTP:
→ Not saturated
→ Slowness is in target servers
→ Or database writes

Code Examples

Creating a Coordinator

// In main.go or setup
coordinator := polling.NewCoordinator(
    projectRepo,
    pollResultRepo,
    500, // max concurrent HTTP requests
)

// Start the system
if err := coordinator.Start(); err != nil {
    log.Fatal(err)
}

// ... application runs ...

// Graceful shutdown
coordinator.Stop()

Adding a Project

// When user creates a project
project, err := domain.NewProject(...)
if err != nil {
    return err
}

// Save to database
projectRepo.Create(project)

// Add to polling (if enabled)
if project.Enabled {
    coordinator.AddProject(project)
    // Polling starts immediately
}

Removing a Project

// When user deletes a project
coordinator.RemoveProject(projectID)
// Polling stops immediately

// Then delete from database
projectRepo.Delete(projectID)

Manual Reload

// Force reload all projects from database
coordinator.ReloadProjects()

// This happens automatically every 5 minutes
// But can be triggered manually if needed

Summary

Architecture Benefits

Scalable - Handles 10,000s of projects on one machine
Efficient - Minimal resource waste
Reliable - No task drops, graceful degradation
Simple - No external dependencies (NATS, Redis, etc)
Observable - Rich metrics and logging
Resilient - Graceful shutdown, no data loss

Key Innovations

  1. Semaphore pattern - Separates cheap (goroutines) from expensive (HTTP)
  2. On-demand execution - No fixed worker pool
  3. Connection pooling - HTTP keep-alive for efficiency
  4. Natural backpressure - System self-regulates under load

When to Evolve

Stay with this architecture:

  • < 10,000 projects
  • Single machine deployment
  • 95-99% uptime acceptable

Migrate to distributed:

  • 20,000 projects

  • Need 99.99% uptime
  • Multi-region deployment
  • Have ops team to manage infrastructure

Architecture Diagram - Complete System

graph TB
    subgraph "External"
        U[User/API]
        DB[(PostgreSQL)]
    end

    subgraph "Coordinator"
        direction TB

        subgraph "Scheduler"
            PM1[Project Monitor 1<br/>Ticker: 5min]
            PM2[Project Monitor 2<br/>Ticker: 1min]
            PMN[Project Monitor N<br/>Ticker: 30s]
        end

        TQ[Task Queue<br/>Buffer: 1000]

        subgraph "Executor Pool"
            TC[Task Consumer]

            subgraph "Goroutines"
                G1[Goroutine 1]
                G2[Goroutine 2]
                GN[Goroutine N]
            end

            SEM{HTTP Semaphore<br/>500 slots}

            H1[HTTP Request]
            H2[HTTP Request]
            H500[HTTP Request]
        end

        RQ[Result Queue<br/>Buffer: 2000]
        RP[Result Processor]

        PM1 --> TQ
        PM2 --> TQ
        PMN --> TQ

        TQ --> TC
        TC -.->|spawn| G1
        TC -.->|spawn| G2
        TC -.->|spawn| GN

        G1 --> SEM
        G2 --> SEM
        GN --> SEM

        SEM -->|acquire| H1
        SEM -->|acquire| H2
        SEM -->|acquire| H500

        H1 --> RQ
        H2 --> RQ
        H500 --> RQ

        RQ --> RP
    end

    U -->|Create/Update| DB
    DB -.->|Load| PM1
    DB -.->|Load| PM2
    RP -->|Save| DB

    style SEM fill:#FF9800
    style H1 fill:#4CAF50
    style H2 fill:#4CAF50
    style H500 fill:#4CAF50
    style RP fill:#BA68C8
    style DB fill:#81C784
Loading

FAQs

Q: Why not use a message queue (NATS/RabbitMQ)?

A: For a monolith, Go channels are:

  • Faster (microsecond latency vs milliseconds)
  • Simpler (no external dependencies)
  • Sufficient (handles 10,000s of projects)

Use message queue when:

  • Multiple machines
  • Need persistence across restarts
  • Distributed architecture

Q: What if goroutine count grows to millions?

A: The system has natural backpressure:

  1. Task queue has buffer (1000)
  2. When full, scheduler blocks
  3. Can't create more goroutines than tasks
  4. Maximum = Projects × concurrent polls

Realistic max: ~30,000 goroutines (60MB RAM)

Q: Can HTTP semaphore size change at runtime?

A: Yes! executor.ScaleTo(newSize) can adjust it dynamically.

Q: What happens if database is down?

A:

  • Polls continue executing
  • Results queue up in memory (buffer 2000)
  • When DB recovers, results are saved
  • If queue fills, results are dropped (logged)

Q: What happens if a target server is down?

A:

  • HTTP request times out (30s)
  • Error result created
  • Semaphore slot released
  • Next task proceeds
  • Failure logged and saved

Q: How to test with many projects?

// Create test projects
for i := 0; i < 1000; i++ {
    project := &domain.Project{
        Name: fmt.Sprintf("Test %d", i),
        URL: "https://httpbin.org/delay/1",
        Interval: 1,
        Unit: "minutes",
        Enabled: true,
    }
    coordinator.AddProject(project)
}

// Monitor stats
ticker := time.NewTicker(10 * time.Second)
for range ticker.C {
    stats := coordinator.GetStatus()
    fmt.Printf("Active: %d, Total: %d\n",
        stats.Workers, stats.TotalRequests)
}

Conclusion

The polling system uses a semaphore-based executor pattern to achieve:

  • High scalability without complex distributed systems
  • Efficient resource usage by separating concerns
  • Reliable execution with natural backpressure
  • Simple operations with no external dependencies

It's the optimal architecture for a single-instance polling system that needs to handle thousands of projects efficiently.

Perfect for: MVP, small-to-medium SaaS, cost-effective monitoring

Evolve when: Need 99.99% uptime, multi-region, or 50,000+ projects

About

HTTP endpoint monitoring and polling with Go, React, and PostgreSQL. Scalable system with dynamic workers, real-time metrics, and JWT authentication.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors