The FlinkDotNet Observability Framework provides comprehensive monitoring, logging, metrics collection, and tracing capabilities following Apache Flink 2.0 standards. This implementation ensures compatibility with Apache Flink's observability patterns while providing native .NET integration.
- Architecture
- Components
- Configuration
- Metrics
- Tracing
- Logging
- Health Monitoring
- Integration Guide
- Best Practices
- Troubleshooting
The observability framework is built on four core pillars, aligned with Apache Flink 2.0:
┌─────────────────────────────────────────────────────────────┐
│ FlinkDotNet Observability │
├─────────────────┬─────────────────┬─────────────────┬───────┤
│ Metrics │ Tracing │ Logging │Health │
│ Collection │ Distributed │ Structured │Checks │
│ │ │ │ │
│ • Throughput │ • End-to-End │ • Contextual │• Job │
│ • Latency │ • Correlation │ • Searchable │• Op. │
│ • Backpressure │ • Performance │ • Standardized │• Res. │
│ • Errors │ • Debugging │ • JSON Format │• Net. │
└─────────────────┴─────────────────┴─────────────────┴───────┘
- Apache Flink Compatibility: All metrics, naming conventions, and patterns follow Apache Flink 2.0 standards
- OpenTelemetry Native: Built on OpenTelemetry for industry-standard observability
- Performance First: Minimal overhead with async operations and efficient data structures
- Production Ready: Comprehensive error handling, circuit breakers, and graceful degradation
Provides comprehensive metrics collection following Apache Flink metric patterns:
public interface IFlinkMetrics
{
void RecordIncomingRecord(string operatorName, string taskId);
void RecordOutgoingRecord(string operatorName, string taskId);
void RecordLatency(string operatorName, string taskId, TimeSpan latency);
void RecordBackpressure(string operatorName, string taskId, TimeSpan duration);
void RecordCheckpointDuration(string jobId, TimeSpan duration);
// ... additional methods
}Enables distributed tracing across job components:
public interface IFlinkTracing
{
Activity? StartOperatorSpan(string operatorName, string taskId);
Activity? StartRecordProcessingSpan(string operatorName, string taskId, string recordId);
Activity? StartCheckpointSpan(string jobId, long checkpointId);
// ... additional methods
}Provides structured logging with rich context:
public interface IFlinkLogger
{
void LogOperatorLifecycle(LogLevel level, string operatorName, string taskId, string lifecycle);
void LogRecordProcessing(LogLevel level, string operatorName, string taskId, string action);
void LogCheckpoint(LogLevel level, string jobId, long checkpointId, string phase);
// ... additional methods
}Comprehensive health monitoring for all components:
public interface IFlinkHealthMonitor
{
Task<FlinkHealthCheckResult> CheckOperatorHealthAsync(string operatorName, string taskId);
Task<FlinkHealthCheckResult> CheckJobHealthAsync(string jobId);
Task<Dictionary<string, FlinkHealthCheckResult>> CheckOverallHealthAsync();
// ... additional methods
}Add FlinkDotNet observability to your application:
// Program.cs
using FlinkDotNet.Core.Observability.Extensions;
var builder = Host.CreateApplicationBuilder(args);
// Add FlinkDotNet observability with configuration
builder.Services.AddFlinkObservability("FlinkJobSimulator", options =>
{
options.EnableConsoleMetrics = true;
options.EnableConsoleTracing = true;
options.EnableOperatorMonitoring = true;
options.EnableJobMonitoring = true;
options.MetricsIntervalSeconds = 10;
options.HealthCheckIntervalSeconds = 30;
});
var app = builder.Build();For production environments:
builder.Services.AddFlinkObservability("FlinkJobSimulator", options =>
{
// Production-ready configuration
options.EnablePrometheusMetrics = true;
options.EnableJaegerTracing = true;
options.JaegerEndpoint = "http://jaeger:14268/api/traces";
// Console outputs disabled in production
options.EnableConsoleMetrics = false;
options.EnableConsoleTracing = false;
// Monitoring configuration
options.EnableOperatorMonitoring = true;
options.EnableJobMonitoring = true;
options.EnableNetworkMonitoring = true;
options.EnableStateBackendMonitoring = true;
// Intervals
options.MetricsIntervalSeconds = 5;
options.HealthCheckIntervalSeconds = 15;
});Configure observability through environment variables:
# OpenTelemetry Configuration
OTEL_EXPORTER_OTLP_ENDPOINT=http://otlp-collector:4317
OTEL_SERVICE_NAME=FlinkJobSimulator
OTEL_SERVICE_VERSION=1.0.0
# Flink-specific Configuration
FLINK_OBSERVABILITY_METRICS_INTERVAL=10
FLINK_OBSERVABILITY_HEALTH_INTERVAL=30
FLINK_OBSERVABILITY_ENABLE_DETAILED_TRACING=trueThe framework implements all standard Apache Flink metrics:
flink_taskmanager_job_task_operator_numRecordsIn: Records received by operatorflink_taskmanager_job_task_operator_numRecordsOut: Records emitted by operatorflink_taskmanager_job_task_operator_numBytesIn: Bytes received by operatorflink_taskmanager_job_task_operator_numBytesOut: Bytes emitted by operatorflink_taskmanager_job_latency: Processing latency histogramflink_taskmanager_job_task_backPressuredTimeMsPerSecond: Backpressure time
flink_jobmanager_job_lastCheckpointDuration: Checkpoint durationflink_jobmanager_job_lastCheckpointSize: Checkpoint sizeflink_jobmanager_job_numRestarts: Number of job restarts
flink_taskmanager_job_task_buffers_inPoolUsage: Buffer pool usageflink_taskmanager_job_task_operator_managedMemoryUsed: Memory usageflink_taskmanager_job_task_operator_watermarkLag: Watermark lag
public class MyOperator : IOperatorLifecycle
{
private readonly IFlinkMetrics _metrics;
private readonly IFlinkLogger _logger;
public void ProcessRecord(MyRecord record)
{
using var activity = _tracing.StartRecordProcessingSpan("MyOperator", _taskId, record.Id);
var startTime = DateTime.UtcNow;
try
{
// Record incoming data
_metrics.RecordIncomingRecord("MyOperator", _taskId);
_metrics.RecordBytesIn("MyOperator", _taskId, record.Size);
// Process the record
var result = ProcessBusinessLogic(record);
// Record outgoing data
_metrics.RecordOutgoingRecord("MyOperator", _taskId);
_metrics.RecordBytesOut("MyOperator", _taskId, result.Size);
// Record processing latency
var latency = DateTime.UtcNow - startTime;
_metrics.RecordLatency("MyOperator", _taskId, latency);
_logger.LogRecordProcessing(LogLevel.Debug, "MyOperator", _taskId,
"processed", 1, latency);
}
catch (Exception ex)
{
_metrics.RecordError("MyOperator", _taskId, ex.GetType().Name);
_logger.LogError("MyOperator", _taskId, ex, "processing");
activity?.RecordException(ex);
throw;
}
}
}FlinkDotNet tracing follows Apache Flink's trace correlation patterns:
Job Submission
├── Operator Chain Span
│ ├── Record Processing Span
│ │ ├── State Operation Span
│ │ └── Network Communication Span
│ └── Checkpoint Span
└── Job Completion Span
// Start a job-level trace
using var jobSpan = _tracing.StartJobSpan("MyStreamingJob");
// Propagate context to operators
foreach (var operator in operators)
{
var operatorContext = _tracing.GetCurrentTraceContext();
operator.SetTraceContext(operatorContext);
}public async Task ProcessWithTracing()
{
using var span = _tracing.StartOperatorSpan("CustomOperator", "task-1");
// Add contextual information
_tracing.AddSpanAttribute("record.count", recordCount);
_tracing.AddSpanAttribute("processing.mode", "streaming");
// Add events for significant occurrences
_tracing.AddSpanEvent("checkpoint.triggered", new Dictionary<string, object>
{
["checkpoint.id"] = checkpointId,
["trigger.reason"] = "timer"
});
try
{
await ProcessData();
}
catch (Exception ex)
{
_tracing.RecordSpanError(ex, "Processing failed during data transformation");
throw;
}
}All logs follow a consistent JSON structure:
{
"timestamp": "2024-01-15T10:30:00.000Z",
"level": "INFO",
"message": "Record processing completed in operator MyOperator, task task-1: 1000 records in 150ms",
"flink.operator.name": "MyOperator",
"flink.task.id": "task-1",
"flink.job.id": "job-123",
"flink.record.count": 1000,
"flink.processing.time.ms": 150,
"flink.event.type": "record_processing",
"service.name": "FlinkJobSimulator",
"service.version": "1.0.0",
"flink.version": "2.0"
}_logger.LogOperatorLifecycle(LogLevel.Information, "MyOperator", "task-1", "started");
_logger.LogOperatorLifecycle(LogLevel.Information, "MyOperator", "task-1", "stopped");_logger.LogRecordProcessing(LogLevel.Debug, "MyOperator", "task-1", "processed",
recordCount: 100, processingTime: TimeSpan.FromMilliseconds(50));_logger.LogCheckpoint(LogLevel.Information, "job-123", 1001, "completed",
duration: TimeSpan.FromSeconds(2), sizeBytes: 1024000);_logger.LogPerformance(LogLevel.Information, "MyOperator", "task-1",
"throughput", 1000.0, "records/sec");Create scoped loggers with additional context:
var scopedLogger = _logger.WithContext(new Dictionary<string, object>
{
["batch.id"] = batchId,
["partition.id"] = partitionId,
["processing.mode"] = "exactly-once"
});
scopedLogger.LogRecordProcessing(LogLevel.Information, "MyOperator", "task-1", "batch_completed");- Memory usage monitoring
- CPU utilization tracking
- Error rate analysis
- Processing latency validation
- Execution status monitoring
- Checkpoint success rate tracking
- Throughput validation
- Resource consumption analysis
- Network communication validation
- State backend accessibility
- Resource availability checking
// Register custom health checks
healthMonitor.RegisterCustomHealthCheck("kafka-connectivity", async cancellationToken =>
{
try
{
// Test Kafka connectivity
await TestKafkaConnection();
return new FlinkHealthCheckResult
{
Status = FlinkHealthStatus.Healthy,
ComponentName = "Kafka",
Description = "Kafka connectivity is healthy",
Data = new Dictionary<string, object>
{
["brokers.available"] = availableBrokers,
["lag.ms"] = consumerLag
}
};
}
catch (Exception ex)
{
return new FlinkHealthCheckResult
{
Status = FlinkHealthStatus.Failed,
ComponentName = "Kafka",
Description = $"Kafka connectivity failed: {ex.Message}",
Exception = ex
};
}
});- Healthy: Component operating within normal parameters
- Degraded: Component operational but performance reduced
- Unhealthy: Component experiencing issues but still functional
- Failed: Component non-operational, requires immediate attention
Update your FlinkJobSimulator to use comprehensive observability:
// Program.cs
public static async Task Main(string[] args)
{
var builder = Host.CreateApplicationBuilder(args);
// Add FlinkDotNet observability
builder.Services.AddFlinkObservability("FlinkJobSimulator", options =>
{
options.EnableOperatorMonitoring = true;
options.EnableJobMonitoring = true;
options.MetricsIntervalSeconds = 10;
});
var host = builder.Build();
// Get observability services
var metrics = host.Services.GetRequiredService<IFlinkMetrics>();
var tracing = host.Services.GetRequiredService<IFlinkTracing>();
var logger = host.Services.GetRequiredService<IFlinkLogger>();
var healthMonitor = host.Services.GetRequiredService<IFlinkHealthMonitor>();
// Start job with full observability
await RunJobWithObservability(metrics, tracing, logger, healthMonitor);
}Configure stress tests with observability monitoring:
# run-local-stress-tests.ps1
param(
[switch]$EnableDetailedObservability,
[int]$MessageCount = 1000,
[int]$MaxTimeMs = 10000
)
# Set observability environment variables
$env:FLINK_OBSERVABILITY_ENABLE_CONSOLE_METRICS = "true"
$env:FLINK_OBSERVABILITY_ENABLE_DETAILED_TRACING = $EnableDetailedObservability.ToString().ToLower()
$env:FLINK_OBSERVABILITY_METRICS_INTERVAL = "5"
$env:FLINK_OBSERVABILITY_HEALTH_INTERVAL = "10"
# Start stress test with observability
Write-Host "🔍 Starting stress test with enhanced observability..."Enhance reliability tests with comprehensive monitoring:
public class ReliabilityTestWithObservability
{
private readonly IFlinkMetrics _metrics;
private readonly IFlinkLogger _logger;
private readonly IFlinkHealthMonitor _healthMonitor;
public async Task RunReliabilityTest()
{
_logger.LogJobEvent(LogLevel.Information, "reliability-test", "ReliabilityTest", "started");
using var jobSpan = _tracing.StartJobSpan("ReliabilityTest");
try
{
// Monitor health throughout the test
var healthTask = MonitorHealthContinuously();
// Run test with metrics collection
await RunTestWithMetrics();
_logger.LogJobEvent(LogLevel.Information, "reliability-test", "ReliabilityTest", "completed");
}
catch (Exception ex)
{
_metrics.RecordError("ReliabilityTest", "main", ex.GetType().Name);
_logger.LogError("ReliabilityTest", "main", ex, "test_execution");
throw;
}
}
}- Async Metrics Collection: Use async patterns to avoid blocking main processing threads
- Batching: Batch metric updates to reduce overhead
- Sampling: Use trace sampling in high-throughput scenarios
- Resource Limits: Set appropriate limits for log retention and metric storage
- Graceful Degradation: Continue operations even if observability fails
- Circuit Breakers: Implement circuit breakers for external observability systems
- Fallback Strategies: Provide local logging when remote systems are unavailable
- Sensitive Data: Never log sensitive information (passwords, keys, PII)
- Access Control: Implement proper access controls for observability data
- Data Retention: Follow compliance requirements for log and metric retention
- Golden Metrics: Focus on throughput, latency, errors, and saturation
- Alerting: Set up alerts for critical thresholds
- Dashboards: Create comprehensive dashboards for different stakeholders
// Monitor memory usage
_logger.LogPerformance(LogLevel.Warning, "System", "memory",
"heap_usage", GC.GetTotalMemory(false) / 1024 / 1024, "MB");
// Check for memory leaks in metrics collection
_healthMonitor.CheckResourceHealthAsync();// Verify trace context propagation
var traceId = _tracing.GetCurrentTraceContext();
if (string.IsNullOrEmpty(traceId))
{
_logger.LogError("Tracing", "context", new Exception("Missing trace context"), "propagation");
}// Implement fallback metrics
try
{
_metrics.RecordIncomingRecord("MyOperator", "task-1");
}
catch (Exception ex)
{
// Fallback to local metrics
_localMetrics.Increment("records_in");
_logger.LogError("Metrics", "collection", ex, "fallback_used");
}- Enable Debug Logging: Set log level to Debug for detailed information
- Trace Correlation: Use trace IDs to correlate logs across components
- Health Checks: Regularly run health checks to identify issues early
- Metric Validation: Validate metric values for consistency
- Metrics Interval: Adjust collection intervals based on system load
- Trace Sampling: Use appropriate sampling rates for production
- Log Filtering: Filter out verbose logs in production environments
- Buffer Sizes: Tune buffer sizes for optimal performance
// Before: Basic console logging
Console.WriteLine($"Processing record {recordId}");
// After: Structured observability
_logger.LogRecordProcessing(LogLevel.Information, "MyOperator", "task-1", "processing",
recordCount: 1, context: new Dictionary<string, object> { ["record.id"] = recordId });
_metrics.RecordIncomingRecord("MyOperator", "task-1");- Add FlinkDotNet.Core.Observability package reference
- Configure observability services in dependency injection
- Replace console logging with structured logging
- Add metrics collection to operators
- Implement distributed tracing
- Configure health checks
- Set up monitoring dashboards
- Configure alerting rules
- Test observability in staging environment
- Deploy to production with monitoring
- Apache Flink Metrics Documentation
- OpenTelemetry .NET Documentation
- FlinkDotNet Core Documentation
- Monitoring Best Practices
This documentation is maintained as part of the FlinkDotNet project. For questions or contributions, please refer to the main project repository.