FlinkDotNet is a comprehensive .NET framework that enables developers to build and submit streaming jobs to Apache Flink 2.0 clusters using a fluent C# API. It provides extensive compatibility with Apache Flink 2.0 features including dynamic scaling, adaptive scheduling, reactive mode, and enterprise-scale multi-cluster orchestration.
In today's data-driven enterprise landscape, choosing the right messaging and stream processing architecture is critical for scalability, reliability, and maintainability. This section provides a comprehensive analysis of why Kafka + FlinkDotNet + Temporal represents the optimal choice for modern real-time data processing at enterprise scale.
| Criteria | Apache Kafka | Amazon Kinesis | Azure Service Bus | Amazon SQS | Azure Event Hubs |
|---|---|---|---|---|---|
| Best Use Case | High-throughput streaming, event sourcing | AWS-native streaming | Enterprise messaging | Simple queuing | Big data ingestion |
| Throughput | Millions/sec | Thousands/sec | Thousands/sec | ~3000/sec per queue | Millions/sec |
| Message Retention | Configurable (days to years) | 24 hours - 7 days | 14 days max | 14 days max | 1-7 days |
| Message Ordering | Per-partition | Per-shard | Session-based | FIFO queues only | Per-partition |
| Multi-Region | Self-managed | Native | Native | Native | Native |
| Cost Model | Infrastructure + ops | Per shard/hour | Per message + storage | Per message | Per throughput unit |
| Ecosystem | Rich (Kafka Connect, etc.) | AWS-specific | Azure-specific | Limited | Azure-specific |
| Schema Evolution | Yes (Schema Registry) | Limited | No | No | Limited |
| Complexity | High | Medium | Low | Very Low | Medium |
Note: When evaluating stream processing solutions, it's important to distinguish between Kafka (the message broker) and complete streaming solutions (Kafka + Kafka Streams or Apache Flink-based solutions).
- Choose Kafka alone when you need: Message queuing, event storage, basic pub/sub, data pipeline transport
- Choose Kafka + Kafka Streams when you need: High throughput stream processing, Java/Scala ecosystem, tight Kafka integration, stream topologies, local state management
- Choose Kinesis when you have: AWS-only environment, moderate throughput needs, integrated AWS services requirement
- Choose Service Bus when you need: Enterprise messaging patterns, complex routing, Azure-native integration
- Choose SQS when you need: Simple queuing, AWS integration, low operational overhead
- Choose Event Hubs when you need: Azure-native big data ingestion, moderate complexity
When choosing between streaming architectures, it's important to compare complete solutions. This section compares Kafka + Kafka Streams (the complete Kafka ecosystem) with FlinkDotNet + Temporal for enterprise-scale stream processing:
| Capability | Kafka + Kafka Streams | FlinkDotNet + Temporal |
|---|---|---|
| Stream Processing | Kafka Streams provides rich processing (windowing, joins, aggregations) | FlinkDotNet provides equivalent stream processing with Apache Flink 2.0 features |
| Fault Tolerance | At-least-once processing, exactly-once within Kafka topics | Exactly-once guarantees with Apache Flink checkpointing + Temporal workflows |
| State Management | Local state stores with changelog topics for fault tolerance | FlinkDotNet savepoints + Temporal durable state persistence |
| Scaling | Horizontal scaling via Kafka partitions, manual rebalancing | FlinkDotNet adaptive scheduler + automatic scaling with Temporal orchestration |
| Complex Workflows | Limited to stream processing topologies | Temporal workflows handle long-running, multi-step business processes |
| Error Handling | Stream-level error handling and dead letter queues | Temporal's advanced retry policies, compensation patterns, and workflow recovery |
| Cross-System Coordination | Limited to Kafka ecosystem, requires external orchestration | Temporal natively coordinates across Kafka + databases + APIs + external systems |
| Language Ecosystem | Java/Scala native, limited .NET support | Full .NET integration with C# APIs and .NET ecosystem |
Choose Kafka + Kafka Streams when:
- Your team has strong Java/Scala expertise
- You need tight integration with the Kafka ecosystem
- Your processing requirements fit well within stream processing topologies
- You want to minimize infrastructure complexity (single technology stack)
- Your use cases are primarily stream transformations and aggregations
Choose FlinkDotNet + Temporal when:
- Your team uses .NET and C# as primary languages
- You need complex, long-running business process orchestration
- You require advanced fault tolerance and workflow recovery capabilities
- You need to coordinate across multiple external systems and APIs
- You want Apache Flink 2.0 features like adaptive scheduling and reactive mode
Kafka + Kafka Streams Architecture:
Kafka (Message Broker) + Kafka Streams (Stream Processing)
↓ ↓
Message Topics → Stream Topologies
Partitioned Data → Stateful Processing
At-least-once → Local State Stores
FlinkDotNet + Temporal Architecture:
Kafka (Data Highway) + FlinkDotNet (Processing Engine) + Temporal (Orchestration Brain)
↓ ↓ ↓
Stream Transport → Real-time Processing → Durable Coordination
Partitioned Topics → Windowing/Aggregations → Multi-step Workflows
At-least-once → Exactly-once Processing → Workflow Guarantees
- Traditional ESB: Monolithic, vendor lock-in, limited scalability, expensive licensing
- Our Stack: Microservices-friendly, open-source, elastic scaling, cloud-native
- Serverless: Vendor lock-in, cold starts, limited processing time, complex local development
- Our Stack: Multi-cloud, consistent performance, unlimited processing time, local development with Aspire
- Big Data: Batch-oriented, complex cluster management, high latency, Java-centric
- Our Stack: Stream-first with batch capability, managed scaling, low latency, .NET ecosystem
- Pulsar + Flink + Airflow: Java-centric ecosystem, complex multi-system integration, separate orchestration layer
- Our Stack: .NET-native APIs with unified Flink integration, simplified operations via Temporal workflows
The Kafka + FlinkDotNet + Temporal architecture excels in scenarios requiring reusable patterns across diverse business cases within the same enterprise infrastructure:
Scenario: Real-time trade processing, risk calculation, and regulatory reporting
// Reusable pattern: Event-driven processing with orchestration
var tradingWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("trades")
.FlinkProcess(env => env
.FromKafka("raw-trades")
.Map(trade => trade.EnrichWithMarketData())
.Rebalance() // Dynamic scaling
.Filter(trade => trade.PassesRiskChecks())
.ToKafka("validated-trades"))
.OrchestrateLongRunning(async () => {
await settleTradeAsync();
await updatePortfolioAsync();
await generateRegulatoryReportAsync();
});Business Cases Served by Same Architecture:
- Real-time Trading: Low-latency order processing
- Risk Management: Continuous position monitoring
- Regulatory Reporting: End-of-day compliance workflows
- Customer Notifications: Trade confirmations and alerts
- Data Analytics: Real-time dashboards and ML model feeding
Scenario: Orders from web, mobile, in-store processed through unified pipeline
// Reusable pattern: Multi-channel aggregation with coordination
var orderWorkflow = Temporal.WorkflowBuilder
.OnMultipleKafkaEvents("web-orders", "mobile-orders", "pos-orders")
.FlinkProcess(env => env
.UnionStreams("web-orders", "mobile-orders", "pos-orders")
.Map(order => order.Normalize())
.KeyBy(order => order.CustomerId)
.Window(TimeWindow.Of(Time.Minutes(5))) // Order bundling
.Aggregate(orders => orders.Combine())
.ToKafka("unified-orders"))
.OrchestrateLongRunning(async (order) => {
await inventoryCheckAsync(order);
await paymentProcessingAsync(order);
await fulfillmentCoordinationAsync(order);
await customerNotificationAsync(order);
});Business Cases Served by Same Architecture:
- Order Processing: Multi-channel order aggregation
- Inventory Management: Real-time stock updates
- Payment Processing: Fraud detection and authorization
- Fulfillment: Warehouse and shipping coordination
- Customer Experience: Real-time order tracking
- Analytics: Customer behavior analysis and recommendations
Scenario: Production line monitoring, predictive maintenance, quality control
// Reusable pattern: IoT data processing with ML integration
var manufacturingWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("sensor-data")
.FlinkProcess(env => env
.FromKafka("iot-sensors")
.KeyBy(reading => reading.MachineId)
.Window(SlidingTimeWindow.Of(Time.Minutes(10), Time.Minutes(1)))
.Aggregate(readings => readings.CalculateMetrics())
.Filter(metrics => metrics.AnomalyScore > 0.8)
.ToKafka("anomaly-alerts"))
.OrchestrateLongRunning(async (anomaly) => {
var prediction = await callMLModelAsync(anomaly);
await scheduleMaintenanceAsync(prediction);
await notifyTechniciansAsync(anomaly);
await adjustProductionParametersAsync(prediction);
});Business Cases Served by Same Architecture:
- Predictive Maintenance: Equipment failure prediction
- Quality Control: Real-time defect detection
- Production Optimization: Throughput maximization
- Supply Chain: Just-in-time inventory
- Energy Management: Power consumption optimization
- Compliance: Environmental and safety monitoring
Scenario: Continuous patient monitoring, care team coordination, emergency response
// Reusable pattern: Critical event processing with care coordination
var healthcareWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("patient-vitals")
.FlinkProcess(env => env
.FromKafka("vital-signs")
.KeyBy(vital => vital.PatientId)
.Map(vital => vital.AnalyzeWithAI()) // Real-time AI analysis
.Filter(analysis => analysis.RequiresIntervention)
.Rebalance() // Load balancing for critical alerts
.ToKafka("critical-alerts"))
.OrchestrateLongRunning(async (alert) => {
await notifyNursingStationAsync(alert);
await escalateToPhysicianAsync(alert);
await prepareEmergencyProtocolsAsync(alert);
await updatePatientRecordAsync(alert);
});Business Cases Served by Same Architecture:
- Patient Monitoring: Continuous vital sign analysis
- Emergency Response: Critical event escalation
- Care Coordination: Multi-provider workflow
- Medical Records: Real-time documentation
- Resource Management: Staff and equipment allocation
- Compliance: HIPAA audit trails
Scenario: Live streaming, content moderation, audience engagement
// Reusable pattern: Media processing with real-time engagement
var mediaWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("content-streams")
.FlinkProcess(env => env
.FromKafka("live-content")
.Map(content => content.ExtractMetadata())
.Filter(content => content.PassesModerationAsync()) // AI content moderation
.PartitionCustom((content, partitions) =>
content.ContentType.GetHashCode() % partitions)
.ToKafka("moderated-content"))
.OrchestrateLongRunning(async (content) => {
await generateThumbnailsAsync(content);
await createSubtitlesAsync(content); // AI-powered
await distributeToChannelsAsync(content);
await trackEngagementMetricsAsync(content);
});Business Cases Served by Same Architecture:
- Content Processing: Real-time transcoding and optimization
- Content Moderation: AI-powered safety checks
- Audience Engagement: Real-time interactions and comments
- Analytics: Viewing patterns and recommendations
- Monetization: Dynamic ad insertion
- Distribution: Multi-platform content delivery
// Pattern: Real-time AI inference with fallback strategies
var aiWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("inference-requests")
.FlinkProcess(env => env
.FromKafka("ai-requests")
.Map(request => request.PreprocessForModel())
.KeyBy(request => request.ModelType) // Route by AI model
.Rebalance() // Distribute load across AI workers
.ToKafka("preprocessed-requests"))
.OrchestrateLongRunning(async (request) => {
try {
var result = await callPrimaryAIModelAsync(request);
await cacheResultAsync(result);
return result;
} catch (ModelUnavailableException) {
return await callFallbackModelAsync(request);
}
});AI/GenAI Use Cases:
- Document Processing: PDF/image extraction → LLM analysis → structured data
- Customer Support: Real-time chat → sentiment analysis → automated responses
- Content Generation: User input → GPT processing → personalized content
- Fraud Detection: Transaction streams → ML models → risk scoring
- Predictive Analytics: Historical data → AI models → future predictions
The architecture provides unified patterns for both business applications and DevOps workflows:
// Same patterns for CI/CD as business workflows
var buildWorkflow = Temporal.WorkflowBuilder
.OnKafkaEvent("git-commits")
.FlinkProcess(env => env
.FromKafka("code-changes")
.Filter(change => change.AffectsProduction())
.Map(change => change.DetermineTestStrategy())
.ToKafka("build-requests"))
.OrchestrateLongRunning(async (buildRequest) => {
await runTestSuiteAsync(buildRequest);
await buildArtifactsAsync(buildRequest);
await deployToStagingAsync(buildRequest);
await runIntegrationTestsAsync(buildRequest);
await deployToProductionAsync(buildRequest);
});DevOps Benefits:
- Unified Architecture: Same infrastructure for business and DevOps
- Observability: Consistent monitoring across all workflows
- Scalability: Elastic CI/CD that scales with development team
- Reliability: Temporal's workflow guarantees for deployments
- Cost Efficiency: Shared infrastructure reduces operational overhead
Note: These are estimated costs based on typical enterprise deployments and may vary significantly based on specific requirements, scale, and implementation choices.
| Solution | Initial Setup | Annual Operations | 3-Year TCO | Vendor Lock-in Risk |
|---|---|---|---|---|
| Our Stack | Medium | Low (open-source) | $2.5M+ | Low |
| Full AWS | Low | High (per-message) | $4.2M+ | High |
| Full Azure | Low | High (per-message) | $3.8M+ | High |
| Traditional ESB | High | Very High (licensing) | $6.1M+ | Very High |
Note: These metrics represent potential improvements and will vary based on team experience, project complexity, and implementation quality.
- Time to Production: Potentially 60% faster with reusable patterns
- Developer Onboarding: .NET developers can be productive immediately
- Maintenance Overhead: Potential 70% reduction with unified architecture
- Bug Resolution: Potentially faster debugging with consistent patterns
FlinkDotNet implements extensive Apache Flink 2.0 feature support for .NET developers, including:
- Dynamic Scaling: Change job parallelism without stopping jobs
- Adaptive Scheduler: Intelligent resource management and automatic parallelism adjustment
- Reactive Mode: Automatic adaptation to available cluster resources
- Advanced Partitioning: Rebalance, rescale, forward, shuffle, broadcast, and custom partitioning
- Savepoint-based Scaling: Scale jobs using savepoints for state consistency
- Fine-grained Resource Management: Slot sharing groups and resource profiles
- Temporal Multi-cluster Orchestration: Enterprise-scale coordination across thousands of clusters
FlinkDotNet provides comprehensive support for Apache Flink 2.0's dynamic scaling capabilities:
var env = Flink.GetExecutionEnvironment();
var dataStream = env.FromCollection(new[] { 1, 2, 3, 4, 5 });
// Rebalance: Uniformly distribute data across all parallel operators
var rebalanced = dataStream
.Map(x => x * 2)
.Rebalance() // Apache Flink 2.0 rebalance operation
.Filter(x => x > 5);
// Rescale: Distribute to subset of operators (more efficient for different parallelisms)
var rescaled = dataStream
.Map(x => x * 3)
.Rescale() // Apache Flink 2.0 rescale operation
.Filter(x => x > 10);
// Forward: Direct forwarding (same parallelism required)
var forwarded = dataStream
.Forward() // Apache Flink 2.0 forward partitioning
.Map(x => x + 1);
// Shuffle: Random distribution
var shuffled = dataStream
.Shuffle() // Apache Flink 2.0 shuffle partitioning
.Map(x => x * 2);
// Broadcast: Send to all operators
var broadcasted = dataStream
.Broadcast() // Apache Flink 2.0 broadcast partitioning
.Map(x => x + 10);
// Custom partitioning
var customPartitioned = dataStream
.PartitionCustom(
(key, numPartitions) => key % numPartitions, // Custom partitioner
x => x.GetHashCode() // Key selector
);
await env.ExecuteAsync("Dynamic Partitioning Example");var env = Flink.GetExecutionEnvironment();
// Configure parallelism and scaling parameters
env.SetParallelism(8) // Default parallelism
.SetMaxParallelism(128) // Maximum parallelism for scaling
.EnableAdaptiveScheduler() // Apache Flink 2.0 adaptive scheduler
.EnableReactiveMode(); // Apache Flink 2.0 reactive mode
var dataStream = env.FromCollection(data)
.SetParallelism(4) // Operator-specific parallelism
.SetMaxParallelism(64) // Operator-specific max parallelism
.SlotSharingGroup("data-processing") // Fine-grained resource management
.Map(x => processData(x))
.Rebalance() // Dynamic rebalancing
.SetParallelism(8); // Scale specific operation
await env.ExecuteAsync("Scalable Processing Job");// Start job from savepoint for scaling
var env = Flink.GetExecutionEnvironment()
.FromSavepoint("/path/to/savepoint") // Restore from savepoint
.SetParallelism(16); // New parallelism
// Execute job asynchronously to get JobClient
var jobClient = await env.ExecuteAsyncJob("Scaled Job");
// Trigger savepoint for scaling
var savepointResult = await jobClient.TriggerSavepointAsync("/path/to/new/savepoint");
// Stop job with savepoint for clean scaling
var stopResult = await jobClient.StopWithSavepointAsync("/path/to/scaling/savepoint", drain: true);
// Cancel with savepoint (alternative approach)
var cancelResult = await jobClient.CancelWithSavepointAsync();
// Monitor job status during scaling
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Job {status.JobName}: {status.State}, Parallelism: {status.Parallelism}/{status.MaxParallelism}");FlinkDotNet provides a comprehensive, multi-layered architecture supporting everything from single jobs to enterprise-scale orchestration:
// Modern DataStream API (Apache Flink 2.0 compatible)
var env = Flink.GetExecutionEnvironment();
env.SetParallelism(4)
.EnableAdaptiveScheduler()
.EnableReactiveMode();
var dataStream = env.FromCollection(new[] { 1, 2, 3, 4, 5 });
dataStream
.Map(x => x * 2)
.Rebalance() // Rebalance across all operators
.Filter(x => x > 5)
.Rescale() // Rescale to subset
.Print();
await env.ExecuteAsync("My Job");
// JobBuilder API (Alternative fluent approach)
var job = Flink.JobBuilder
.FromKafka("orders")
.Where("Amount > 100")
.GroupBy("Region")
.Aggregate("SUM", "Amount")
.ToKafka("high-value-orders");
await job.Submit("Processing Job");// Enterprise-scale FlinkDotNet.Orchestration for thousands of clusters
var orchestra = new FlinkOrchestra(logger);
// Provision clusters with auto-scaling
await orchestra.ProvisionClusterAsync(new ClusterConfiguration
{
Name = "production-cluster",
TaskSlots = 8,
TaskManagers = 4
});
// Submit jobs with intelligent placement
var result = await orchestra.SubmitJobAsync(jobDefinition, SubmissionStrategy.BestFit);
// Start Temporal FlinkDotNet.Orchestration workflows
await orchestra.StartOrchestrationWorkflowAsync(new OrchestrationRequest
{
TargetClusters = 1000,
MinClusters = 10,
MaxClusters = 5000
});var config = new ExecutionConfig()
.SetParallelism(8)
.SetMaxParallelism(128)
.EnableAdaptiveScheduler() // Apache Flink 2.0 intelligent scheduling
.EnableReactiveMode() // Apache Flink 2.0 elastic scaling
.SetRestartStrategy("exponential-delay") // Advanced fault tolerance
.EnableSlotSharing() // Resource optimization
.EnableObjectReuse() // Performance optimization
.SetAutoWatermarkInterval(200); // Event time processing
var env = Flink.GetExecutionEnvironment(config);FlinkDotNet provides a complete enterprise-scale integration solution with multi-layered architecture:
- .NET SDK (FlinkDotNet.DataStream): Complete Apache Flink 2.0 streaming API
- JobBuilder SDK (Flink.JobBuilder): Fluent C# DSL for rapid development
- Intermediate Representation (IR): JSON-based job definitions
- Job Gateway: HTTP service that bridges .NET applications with Apache Flink clusters
- FlinkDotNet.Orchestration: Multi-cluster job orchestration with intelligent placement strategies
- FlinkDotNet.ClusterManager: Actor-based cluster lifecycle management
- FlinkDotNet.Temporal: Temporal.io workflow definitions for durable orchestration
- FlinkDotNet.Resilience: Circuit breakers, retry policies, and health checkers
┌─────────────────────────────────────────────────────────────────────────────┐
│ FlinkDotNet.Orchestration │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ Cluster A │ │ Cluster B │ │ Cluster N │ ... │
│ │ (Actor-based) │ │ (Actor-based) │ │ (Actor-based) │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ FlinkDotNet.Temporal Workflows │
│ ┌──────────────────────┐ ┌──────────────────────┐ │
│ │ Auto-scaling │ │ Job Distribution │ │
│ │ Workflows │ │ Workflows │ │
│ └──────────────────────┘ └──────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Apache Flink 2.0 Compatible APIs │
│ ┌─────────────────────┐ ┌─────────────────────┐ │
│ │ FlinkDotNet │ │ Flink.JobBuilder │ │
│ │ .DataStream │ │ (Fluent DSL) │ │
│ │ (Apache Flink 2.0 │ │ (Rapid │ │
│ │ compatible API) │ │ Development) │ │
│ └─────────────────────┘ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ Apache Flink 2.0 Clusters │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ JobManager + │ │ JobManager + │ │ JobManager + │ ... │
│ │ TaskManagers │ │ TaskManagers │ │ TaskManagers │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└─────────────────────────────────────────────────────────────────────────────┘
The FlinkDotNet.Gateway acts as a bridge between .NET applications and Apache Flink 2.0 clusters, supporting advanced scaling features:
- Job Submission: .NET applications submit job definitions via HTTP to the gateway
- IR Translation: Gateway translates JSON IR to Flink JobGraph
- Cluster Communication: Gateway communicates with Flink JobManager via REST API
- Status Monitoring: Gateway provides job status and metrics back to .NET applications
- Orchestra Coordination: FlinkOrchestra manages job distribution across thousands of clusters
- Actor-based Management: Each cluster is managed by an independent ClusterActor
- Temporal Workflows: Long-running orchestration processes with exactly-once guarantees
- Intelligent Placement: Jobs routed to optimal clusters based on health, capacity, and locality
- Auto-scaling: Dynamic cluster provisioning and decommissioning based on demand
- Adaptive Scheduling: Apache Flink 2.0 adaptive scheduler integration
- Reactive Scaling: Automatic adaptation to available resources
┌─────────────────┐ HTTP ┌─────────────────┐ Orchestration ┌─────────────────┐
│ .NET App │─────────────▶│ FlinkDotNet │─────────────────────▶│ FlinkDotNet │
│ │ │ Gateway │ │ Orchestra │
│ DataStream/ │◀─────────────│ │◀─────────────────────│ (Multi-cluster) │
│ JobBuilder APIs │ JSON IR └─────────────────┘ Job Distribution └─────────────────┘
└─────────────────┘ │ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Apache Flink │ │ ClusterManager │
│ JobManager │◀───────────────────────│ Actors │
│ (Single) │ REST APIs + Scaling │ (Thousands) │
└─────────────────┘ └─────────────────┘
The gateway and orchestra handle:
- Authentication & Authorization: Secure access to Flink clusters
- Load Balancing: Distribute jobs across multiple Flink clusters
- Monitoring & Metrics: Real-time job status and performance metrics across all clusters
- Error Handling: Graceful error recovery and retry logic with circuit breakers
- Auto-scaling: Intelligent cluster provisioning and capacity management
- Health Aggregation: Cross-cluster health monitoring and issue detection
- Dynamic Scaling: Apache Flink 2.0 savepoint-based scaling workflows
- Adaptive Scheduling: Integration with Flink 2.0 adaptive scheduler
- Reactive Mode: Automatic parallelism adjustment based on cluster resources
FlinkDotNet/
├── FlinkDotNet.Common/ # Core types and configuration
│ ├── Configuration # Configuration, ExecutionConfig with Flink 2.0 features
│ ├── TypeInfo # Types, TypeInformation
│ └── JobManagement # JobClient with scaling capabilities
├── FlinkDotNet.DataStream/ # Apache Flink 2.0 compatible streaming API
│ ├── StreamExecutionEnvironment # Main entry point with adaptive/reactive modes
│ ├── DataStream # Core streaming API with partitioning strategies
│ ├── Functions # User functions
│ └── Connectors # Sources and sinks
├── FlinkDotNet.Orchestration/ # Multi-cluster orchestration
│ ├── Services # FlinkOrchestra, ClusterActorBridge
│ ├── Models # ClusterStatus, JobSubmissionResult
│ └── Interfaces # IFlinkOrchestra, IFlinkClusterActor
├── FlinkDotNet.ClusterManager/ # Individual cluster management
│ ├── Actors # FlinkClusterActor (actor-based lifecycle)
│ ├── Models # ClusterConfiguration, ClusterMetrics
│ └── Interfaces # IFlinkClusterActor
├── FlinkDotNet.Temporal/ # Temporal.io workflow definitions
│ ├── Workflows # ClusterOrchestrationWorkflow
│ ├── Activities # Cluster management activities
│ └── Models # Workflow request/response models
├── FlinkDotNet.Resilience/ # Fault tolerance patterns
│ ├── CircuitBreakers # Prevent cascade failures
│ ├── RetryPolicies # Exponential backoff strategies
│ └── HealthCheckers # Cluster health validation
├── Flink.JobBuilder/ # Fluent DSL for rapid development
│ ├── FlinkJobBuilder # Main fluent DSL
│ ├── Models # JobDefinition, IR models
│ └── Extensions # Extension methods
├── FlinkDotNet.Table/ # Table API (future)
├── FlinkDotNet.Testing/ # Testing utilities
├── FlinkDotNet.Util/ # Utility classes
└── FlinkDotNet/ # Main unified API entry point
var env = Flink.GetExecutionEnvironment();
// Configure Apache Flink 2.0 features
env.SetParallelism(4)
.SetMaxParallelism(128) // Enable dynamic scaling
.EnableAdaptiveScheduler() // Automatic parallelism adjustment
.EnableReactiveMode() // Adapt to cluster resources
.EnableCheckpointing(5000); // Checkpointing for fault tolerance
var numbers = env.FromCollection(Enumerable.Range(1, 1000));
var result = numbers
.Filter(x => x % 2 == 0) // Even numbers only
.Map(x => x * x) // Square them
.Rebalance() // Apache Flink 2.0 rebalancing
.SetParallelism(8) // Scale this operation
.Sum(); // Sum the results
await env.ExecuteAsync("Even Squares with Dynamic Scaling");var env = Flink.GetExecutionEnvironment();
env.EnableAdaptiveScheduler()
.EnableReactiveMode();
var dataStream = env.FromCollection(generateData());
// Demonstrate all Apache Flink 2.0 partitioning strategies
var processed = dataStream
.Map(x => processData(x))
.SetParallelism(4)
.SlotSharingGroup("data-processing") // Fine-grained resource management
// Rebalance: Uniform distribution across all operators
.Rebalance()
.Map(x => enrichData(x))
.SetParallelism(8)
// Rescale: Efficient distribution for different parallelisms
.Rescale()
.Filter(x => x.IsValid)
.SetParallelism(4)
// Forward: Direct forwarding (same parallelism)
.Forward()
.Map(x => finalProcessing(x))
.SetParallelism(4)
// Custom partitioning based on business logic
.PartitionCustom(
(key, numPartitions) => key.GetHashCode() % numPartitions,
x => x.CustomerId
)
.SlotSharingGroup("customer-processing");
await env.ExecuteAsync("Advanced Partitioning Example");var job = Flink.JobBuilder
.FromKafka("input-topic", config => {
config.BootstrapServers = "localhost:9092";
config.GroupId = "processing-group";
})
.Map("processed = transform(data)")
.Where("processed.isValid")
.ToKafka("output-topic");
// Configure Apache Flink 2.0 features for the job
await job.Configure(config => {
config.EnableAdaptiveScheduler()
.EnableReactiveMode()
.SetParallelism(8)
.SetMaxParallelism(128);
}).Submit("Kafka Processing with Auto-Scaling");var job = Flink.JobBuilder
.FromKafka("events")
.GroupBy("userId")
.Window("TUMBLING", 5, "MINUTES")
.Aggregate("COUNT", "*")
.ToKafka("user-activity");
await job.Configure(config => {
config.EnableReactiveMode() // Adapt to cluster resources
.SetRestartStrategy("exponential-delay") // Advanced fault tolerance
.EnableSlotSharing(); // Resource optimization
}).Submit("User Activity with Reactive Scaling");var orchestra = new FlinkOrchestra(logger);
// Provision a new cluster with Apache Flink 2.0 features
var cluster = await orchestra.ProvisionClusterAsync(new ClusterConfiguration
{
Name = "production-west",
TaskSlots = 16,
TaskManagers = 8,
Region = "us-west-2",
HighAvailability = true,
AdaptiveSchedulerEnabled = true, // Enable Apache Flink 2.0 adaptive scheduler
ReactiveModeEnabled = true // Enable reactive mode
});
// Get cluster health across all clusters
var health = await orchestra.GetClusterHealthAsync();
Console.WriteLine($"Overall Health Score: {health.OverallHealthScore:F1}%");
Console.WriteLine($"Total Clusters: {health.TotalClusters}");
Console.WriteLine($"Healthy: {health.HealthyClusters}, Critical: {health.CriticalClusters}");// Define a job with Apache Flink 2.0 configuration
var jobDefinition = new FlinkJobDefinition
{
JobId = "analytics-pipeline",
JobName = "Real-time Analytics",
JobGraph = "...", // Generated from DataStream/JobBuilder
Parallelism = 8,
MaxParallelism = 128, // Enable dynamic scaling
AdaptiveSchedulerEnabled = true, // Intelligent resource management
ReactiveModeEnabled = true, // Automatic adaptation
Priority = JobPriority.High
};
// Submit with intelligent placement
var result = await orchestra.SubmitJobAsync(jobDefinition, SubmissionStrategy.BestFit);
if (result.Success)
{
Console.WriteLine($"Job {result.JobId} submitted to cluster {result.ClusterId}");
Console.WriteLine($"Flink Job ID: {result.FlinkJobId}");
// Monitor scaling behavior
var jobClient = result.JobClient;
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Current Parallelism: {status.Parallelism}/{status.MaxParallelism}");
}// Execute job with scaling capabilities
var jobClient = await env.ExecuteAsyncJob("Scalable Analytics Job");
// Monitor and scale using savepoints
var status = await jobClient.GetJobStatusAsync();
Console.WriteLine($"Initial Parallelism: {status.Parallelism}");
// Create savepoint for scaling
var savepointResult = await jobClient.TriggerSavepointAsync("/savepoints/scaling-point");
if (savepointResult.Success)
{
Console.WriteLine($"Savepoint created at: {savepointResult.SavepointPath}");
// Stop job gracefully for scaling
var stopResult = await jobClient.StopWithSavepointAsync(savepointPath: savepointResult.SavepointPath, drain: true);
if (stopResult.Success)
{
// Restart with new parallelism
var scaledEnv = Flink.GetExecutionEnvironment()
.FromSavepoint(stopResult.SavepointPath) // Restore from savepoint
.SetParallelism(16) // New parallelism
.SetMaxParallelism(256) // New max parallelism
.EnableAdaptiveScheduler()
.EnableReactiveMode();
// Re-execute with scaled configuration
var scaledJobClient = await scaledEnv.ExecuteAsyncJob("Scaled Analytics Job");
var scaledStatus = await scaledJobClient.GetJobStatusAsync();
Console.WriteLine($"Scaled Parallelism: {scaledStatus.Parallelism}");
}
}// Start long-running orchestration workflow with Apache Flink 2.0 features
var workflowId = await orchestra.StartOrchestrationWorkflowAsync(new OrchestrationRequest
{
RequestId = "scaling-request-1",
TargetClusters = 500,
MinClusters = 50,
MaxClusters = 2000,
ScalingPolicy = "demand-based",
AdaptiveSchedulerEnabled = true, // Enable intelligent scheduling across clusters
ReactiveModeEnabled = true // Enable reactive scaling
});
Console.WriteLine($"Started orchestration workflow: {workflowId}");
// Monitor and scale dynamically
var scalingResult = await orchestra.ScaleOrchestraAsync(targetCapacity: 750);
Console.WriteLine($"Scaled from {scalingResult.PreviousCapacity} to {scalingResult.NewCapacity} clusters");FlinkDotNet includes built-in backpressure support with Apache Flink 2.0 enhancements to ensure system stability:
using Flink.JobBuilder.Backpressure;
// Configure rate limiter with adaptive behavior
var rateLimiter = new TokenBucketRateLimiter(
rateLimit: 1000.0, // 1000 operations per second
burstCapacity: 2000.0 // Handle bursts up to 2000
);
// Use in your application with automatic backpressure handling
if (rateLimiter.TryAcquire())
{
await ProcessMessage(message);
}
else
{
// Apache Flink 2.0 handles backpressure automatically
// This provides additional application-level control
await Task.Delay(100); // Wait and retry
}
// Configure backpressure in execution environment
var env = Flink.GetExecutionEnvironment();
env.GetConfig()
.SetProperty("taskmanager.network.memory.max-buffers-per-channel", "10")
.SetProperty("taskmanager.network.memory.buffers-per-channel", "2")
.EnableObjectReuse(); // Reduce GC pressureFlinkDotNet includes comprehensive testing capabilities with Apache Flink 2.0 integration:
[Fact]
public async Task TestStreamProcessingWithScaling()
{
var env = Flink.GetExecutionEnvironment();
env.EnableAdaptiveScheduler()
.EnableReactiveMode()
.SetMaxParallelism(128);
var testData = new[] { 1, 2, 3, 4, 5 };
var result = env.FromCollection(testData)
.Map(x => x * 2)
.Rebalance() // Test Apache Flink 2.0 rebalancing
.SetParallelism(4) // Test dynamic parallelism
.CollectAsync();
var expected = new[] { 2, 4, 6, 8, 10 };
Assert.Equal(expected, await result);
}
[Fact]
public async Task TestSavepointBasedScaling()
{
var jobClient = await env.ExecuteAsyncJob("Test Scaling Job");
// Test savepoint creation
var savepointResult = await jobClient.TriggerSavepointAsync();
Assert.True(savepointResult.Success);
// Test graceful stopping with savepoint
var stopResult = await jobClient.StopWithSavepointAsync(drain: true);
Assert.True(stopResult.Success);
Assert.True(stopResult.Drained);
}The project includes comprehensive stress tests that validate:
- High-throughput processing (1M+ messages)
- Backpressure handling with Apache Flink 2.0 improvements
- Fault tolerance and recovery with adaptive scheduling
- Dynamic scaling scenarios and savepoint-based workflows
- Reactive mode adaptation to resource changes
FlinkDotNet integrates with .NET Aspire for local development with Apache Flink 2.0 features:
// LocalTesting/Program.cs
var builder = DistributedApplication.CreateBuilder(args);
var kafka = builder.AddKafka("kafka");
// Apache Flink 2.0 cluster with advanced features
var flink = builder.AddContainer("flink", "flink:2.0-latest")
.WithEnvironment("FLINK_PROPERTIES",
"scheduler-mode: adaptive\n" +
"scheduler.adaptive.scaling-enabled: true\n" +
"scheduler.adaptive.resource.wait-timeout: 60s\n" +
"execution.checkpointing.interval: 5s\n" +
"parallelism.default: 4\n" +
"parallelism.default.sink: 8\n" +
"taskmanager.numberOfTaskSlots: 8");
var gateway = builder.AddProject<Projects.FlinkDotNet_Gateway>("gateway")
.WithReference(flink);
var testApp = builder.AddProject<Projects.TestApp>("testapp")
.WithReference(gateway)
.WithReference(kafka);
builder.Build().Run();FlinkDotNet implements comprehensive build and test validation to ensure code quality and prevent build failures.
# Verify .NET 9.0 requirement
dotnet --version # Must show 9.0.x
# Run complete validation
./validate-build-and-tests.ps1
# Quick build check (skip tests)
./validate-build-and-tests.ps1 -SkipTests- ✅ ALL builds MUST pass before commits/merges
- ✅ .NET 9.0.x required for all development
- ✅ Three solutions validated: FlinkDotNet, Sample, LocalTesting
- ✅ Automated blocking of build failures via GitHub Actions
# Always run before committing
./pre-commit-validation.ps1- 📖 Complete Guide - Detailed enforcement rules and troubleshooting
- 🚀 Quick Start - 2-minute developer setup guide
Important: Build failures are automatically blocked. Fix build errors before proceeding with any development work.
-
Install FlinkDotNet NuGet packages
dotnet add package FlinkDotNet dotnet add package FlinkDotNet.DataStream
-
Set up Apache Flink 2.0 cluster
- Download and install Apache Flink 2.0
- Start JobManager and TaskManager with adaptive scheduler enabled
- Configure reactive mode if desired
-
Deploy FlinkDotNet.Gateway
- Configure connection to your Flink cluster
- Deploy as web service or container
- Enable Apache Flink 2.0 feature support
-
Build and submit your first job with scaling capabilities
// Apache Flink 2.0 compatible approach (DataStream API) var env = Flink.GetExecutionEnvironment(); env.EnableAdaptiveScheduler() .EnableReactiveMode() .SetMaxParallelism(128); var stream = env.FromCollection(new[] { 1, 2, 3 }) .Rebalance() .SetParallelism(4); await env.ExecuteAsync("My First Scaling Job"); // Alternative approach (JobBuilder for rapid development) var job = Flink.JobBuilder .FromKafka("source") .Map("value = process(data)") .ToKafka("destination"); await job.Configure(config => config.EnableAdaptiveScheduler()) .Submit("My First JobBuilder Job");
-
Install additional orchestration packages
dotnet add package FlinkDotNet.Orchestration dotnet add package FlinkDotNet.ClusterManager dotnet add package FlinkDotNet.Temporal dotnet add package FlinkDotNet.Resilience
-
Set up Temporal Server
# Using Docker docker run -p 7233:7233 -p 8233:8233 temporalio/auto-setup:latest -
Initialize Orchestra service with Apache Flink 2.0 features
var services = new ServiceCollection(); services.AddLogging(); services.AddSingleton<IFlinkOrchestra, FlinkOrchestra>(); var provider = services.BuildServiceProvider(); var orchestra = provider.GetRequiredService<IFlinkOrchestra>();
-
Start with cluster provisioning and scaling
// Provision your first cluster with Apache Flink 2.0 features var cluster = await orchestra.ProvisionClusterAsync(new ClusterConfiguration { Name = "starter-cluster", TaskSlots = 4, TaskManagers = 2, AdaptiveSchedulerEnabled = true, // Enable intelligent scheduling ReactiveModeEnabled = true // Enable automatic adaptation }); // Check overall health and scaling capabilities var health = await orchestra.GetClusterHealthAsync(); Console.WriteLine($"Health Score: {health.OverallHealthScore:F1}%"); Console.WriteLine($"Adaptive Scheduler: {cluster.AdaptiveSchedulerEnabled}"); Console.WriteLine($"Reactive Mode: {cluster.ReactiveModeEnabled}");
- Getting Started Guide
- Complete Usage Example
- Gateway Communication Guide
- Local Development Setup
- Contributing Guidelines
- Dynamic Scaling and Rebalancing Guide
- Adaptive Scheduler Configuration
- Reactive Mode Implementation
- Savepoint-based Scaling Workflows
- Fine-grained Resource Management
- Flink vs Temporal Decision Guide
- Backpressure Complete Reference
- Aspire Container Architecture
- Rate Limiting Implementation
- Local Testing Setup
- Stress Tests Overview
- Reliability Tests Overview
- Complex Logic Stress Tests
- Observability and Monitoring
- Monitoring Best Practices
FlinkDotNet provides complete Apache Flink 2.0 compatibility including:
- Adaptive Scheduler: Automatic parallelism adjustment based on workload characteristics
- Reactive Mode: Elastic scaling that adapts to available cluster resources
- Dynamic Scaling: Change job parallelism without stopping jobs using savepoints
- Advanced Partitioning: All Apache Flink 2.0 partitioning strategies (rebalance, rescale, forward, shuffle, broadcast, custom)
- Fine-grained Resource Management: Slot sharing groups and resource profiles
- Enhanced Fault Tolerance: Advanced restart strategies and checkpointing
// Enable all Apache Flink 2.0 features
var env = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler() // Intelligent resource management
.EnableReactiveMode() // Elastic scaling
.SetMaxParallelism(256) // Dynamic scaling support
.EnableCheckpointing(5000); // Enhanced fault tolerance
var scalableStream = env.FromCollection(data)
.Rebalance() // Apache Flink 2.0 rebalancing
.SetParallelism(8) // Dynamic parallelism
.SlotSharingGroup("processing"); // Fine-grained resourcesFlinkDotNet supports multiple scaling approaches:
-
Reactive Mode Scaling (Automatic)
env.EnableReactiveMode(); // Automatically adapts to cluster resources
-
Adaptive Scheduler (Intelligent)
env.EnableAdaptiveScheduler(); // AI-driven parallelism adjustment
-
Savepoint-based Scaling (Manual)
var jobClient = await env.ExecuteAsyncJob("My Job"); var savepoint = await jobClient.TriggerSavepointAsync(); // Restart with new parallelism from savepoint
-
Runtime Partitioning (Dynamic)
dataStream.Rebalance() // Redistribute uniformly .Rescale() // Efficient subset distribution .Forward() // Direct forwarding .Shuffle(); // Random distribution
Choose based on your use case:
- DataStream API: Use for Apache Flink 2.0 compatibility, complex stream processing, and when you need full control over scaling and partitioning
- JobBuilder API: Use for rapid development, simple pipelines, and when you prefer fluent syntax
- Orchestra API: Use for enterprise-scale multi-cluster deployments with thousands of jobs
Example decision matrix:
// Complex processing with scaling requirements
var env = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler()
.EnableReactiveMode();
var stream = env.FromCollection(data)
.Rebalance()
.SetParallelism(8);
// Simple pipeline with fluent syntax
var job = Flink.JobBuilder
.FromKafka("input")
.Map("process(data)")
.ToKafka("output");
// Enterprise multi-cluster orchestration
var orchestra = new FlinkOrchestra(logger);
await orchestra.SubmitJobAsync(jobDef, SubmissionStrategy.BestFit);FlinkDotNet maintains full compatibility while adding Apache Flink 2.0 features:
- Keep existing code: All existing DataStream and JobBuilder code continues to work
- Add Apache Flink 2.0 features gradually: Enable adaptive scheduler, reactive mode, and advanced partitioning as needed
- Scale incrementally: Start with single cluster, add orchestration layer when needed
- Optimize performance: Use new partitioning strategies and resource management features
Migration example:
// Existing code (still works)
var env = Flink.GetExecutionEnvironment();
var stream = env.FromCollection(data).Map(x => x * 2);
// Enhanced with Apache Flink 2.0 features
var enhancedEnv = Flink.GetExecutionEnvironment()
.EnableAdaptiveScheduler() // Add intelligent scheduling
.EnableReactiveMode() // Add elastic scaling
.SetMaxParallelism(128); // Enable dynamic scaling
var enhancedStream = enhancedEnv.FromCollection(data)
.Map(x => x * 2)
.Rebalance() // Add efficient rebalancing
.SetParallelism(8); // Set optimal parallelismThe architecture is designed for incremental adoption - you can start with basic features and scale to enterprise levels with Apache Flink 2.0 capabilities as your requirements grow.
- Fork the repository
- Create a feature branch
- Make your changes
- Add tests
- Submit a pull request
See CONTRIBUTING.md for detailed guidelines.
MIT License - see LICENSE file for details.