This guide will walk you through setting up the Flink.JobBuilder SDK and deploying Apache Flink infrastructure to run your first streaming job.
- .NET SDK: Ensure you have .NET 9.0 SDK or higher installed. You can download it from here.
- IDE (Optional but Recommended): An IDE like Visual Studio, JetBrains Rider, or VS Code can greatly improve your development experience.
- Docker and Kubernetes: For deploying Apache Flink cluster infrastructure.
- Apache Flink Cluster: You'll need either:
- A Kubernetes cluster with our provided manifests (recommended)
- A local Apache Flink installation following the Apache Flink Local Installation Guide
Start by creating a new .NET console application. You can do this using the .NET CLI or your preferred IDE.
dotnet new console -n MyFlinkJobApp
cd MyFlinkJobAppAdd the Flink.JobBuilder SDK to your project:
dotnet add package Flink.JobBuilderLet's create a simple streaming job that processes data using Apache Flink:
// Program.cs
using Flink.JobBuilder;
using Flink.JobBuilder.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
public class Program
{
public static async Task Main(string[] args)
{
// 1. Set up dependency injection
var services = new ServiceCollection();
services.AddFlinkJobBuilder(config =>
{
config.BaseUrl = "http://localhost:8080"; // Flink Job Gateway URL
});
var serviceProvider = services.BuildServiceProvider();
// 2. Create a streaming job with fluent API
var job = serviceProvider.CreateJobBuilder()
.FromKafka("orders")
.Where("Amount > 100")
.GroupBy("Region")
.Aggregate("SUM", "Amount")
.ToKafka("high-value-orders");
// 3. Submit to Apache Flink cluster
try
{
var result = await job.Submit("OrderProcessingJob");
Console.WriteLine($"Job submitted successfully!");
Console.WriteLine($"Flink Job ID: {result.FlinkJobId}");
Console.WriteLine($"Status: {result.Status}");
}
catch (Exception ex)
{
Console.WriteLine($"Failed to submit job: {ex.Message}");
}
}
}To run your job, you need Apache Flink cluster infrastructure. Choose one of the options:
Deploy the complete Flink.NET ecosystem to Kubernetes:
# Clone the repository
git clone https://github.com/devstress/FlinkDotnet.git
cd FlinkDotnet
# Deploy to Kubernetes
kubectl apply -f k8s/This deploys:
- Apache Flink cluster (JobManager + TaskManager)
- Flink Job Gateway (.NET ASP.NET Core service)
- Apache Kafka + Zookeeper
- All necessary services and configuration
For local development and testing:
# Build all components
./build-all.sh
# Run with Aspire orchestration
cd Sample/FlinkDotNetAspire.AppHost.AppHost
dotnet runOnce your infrastructure is running:
# Run your .NET application
dotnet runThe application will:
- Generate JSON IR from your C# job definition
- Submit it to the Flink Job Gateway
- The gateway translates it to Apache Flink DataStream jobs
- Execute on the Flink cluster
Access monitoring interfaces:
# Flink Web UI (if running locally)
kubectl port-forward svc/flink-jobmanager-ui 8081:8081 -n flink-system
# Visit http://localhost:8081
# Job Gateway API
kubectl port-forward svc/flink-job-gateway 8080:8080 -n flink-system
# Visit http://localhost:8080/swagger-ui.html- Deployment Guide: Learn about production deployment
- Sample Applications: Explore working examples
- API Reference: Detailed SDK documentation
- Architecture Overview: Understand the system design
Job submission fails with connection error:
- Ensure Flink Job Gateway is running and accessible
- Check the BaseUrl configuration in your application
- Verify network connectivity to the gateway
Gateway returns validation errors:
- Check your job definition syntax
- Ensure all required fields are provided
- Review the IR generated by your job builder
Infrastructure deployment issues:
- Verify Kubernetes cluster has sufficient resources
- Check pod logs:
kubectl logs -n flink-system <pod-name> - Ensure all images are accessible from your cluster
- Setting environment variables.
- A configuration file (e.g.,
appsettings.json). - Programmatic configuration.
Real Configuration Example from Sample/FlinkDotNet.Aspire.AppHost/Program.cs:
{
"Flink": {
"JobManagerRestAddress": "http://localhost:8081",
"JobManagerRpcAddress": "flink-jobmanager",
"TaskManagerConfig": {
"NumberOfTaskSlots": 10,
"MemorySize": "8192m"
},
"KafkaConfig": {
"BootstrapServers": "kafka-kraft-cluster:9092",
"GroupId": "flink-dotnet-consumer-group",
"AutoOffsetReset": "earliest"
},
"RedisConnectionString": "localhost:6379",
"Backpressure": {
"MaxConsumerLag": 10000,
"ScalingThreshold": 5000,
"MonitoringInterval": "00:00:05",
"QuotaEnforcement": "Per-client, per-IP",
"DynamicRebalancing": true
}
}
}This configuration is based on the real Aspire AppHost setup used in BDD testing scenarios, with actual container configuration values from the working test infrastructure.
(Refer to Flink.NET's specific documentation for details on how to configure the job submission and connection to Flink.)
Once your code is ready and the Flink connection is configured:
-
Ensure your stream processing cluster is running. You can typically start it by navigating to your Flink installation's
bindirectory and running./start-cluster.sh(on Linux/macOS) orstart-cluster.bat(on Windows). -
Run your .NET application:
dotnet run
You should see the odd numbers (1, 3, 5, 7, 9) printed in the console output from your Flink job. You can also monitor the job through the Flink Web UI (usually at http://localhost:8081).
- Explore different [[Operators|Developing-Operators]] available in Flink.NET.
- Learn about [[Connectors|Connectors-Overview]] to read from and write to external systems.
- Understand [[State Management|Core-Concepts-State-Management-Overview]] for building stateful applications.
Flink.NET provides comprehensive testing capabilities with both BDD specifications and C# implementations:
Our testing follows Behavior-Driven Development (BDD) patterns using SpecFlow/Reqnroll:
📁 Test Structure:
- BDD Feature Files:
/Sample/FlinkDotNet.Aspire.IntegrationTests/Features/StressTest.feature- Basic performance testing scenariosComplexLogicStressTest.feature- Advanced integration scenariosBackpressureTest.feature- Rate limiting and backpressure scenarios
- C# Step Definitions:
/Sample/FlinkDotNet.Aspire.IntegrationTests/StepDefinitions/StressTestStepDefinitions.cs- Basic stress test implementationsComplexLogicStressTestStepDefinitions.cs- Complex logic implementationsBackpressureTestStepDefinitions.cs- Backpressure implementations
BDD Scenario Example:
# From StressTest.feature
Scenario: Basic Job Submission Test
Given the Flink cluster is running
When I create a simple streaming job:
| Source | Kafka topic "test-input" |
| Transform | Map each message to uppercase |
| Sink | Kafka topic "test-output" |
Then the job should be submitted successfullyCorresponding C# Implementation:
// From StressTestStepDefinitions.cs
[Given(@"the Flink cluster is running")]
public void GivenTheFlinkClusterIsRunning()
{
var clusterHealthy = ValidateFlinkCluster();
Assert.True(clusterHealthy, "Flink cluster should be running and healthy");
}
[When(@"I create a simple streaming job:")]
public void WhenICreateASimpleStreamingJob(Table table)
{
// Uses the main FlinkJobBuilder API:
_jobBuilder = FlinkJobBuilder
.FromKafka("test-input")
.Map("message => message.toUpperCase()")
.ToKafka("test-output");
_jobDefinition = _jobBuilder.BuildJobDefinition();
}
[Then(@"the job should be submitted successfully")]
public async Task ThenTheJobShouldBeSubmittedSuccessfully()
{
var result = await _jobBuilder.Submit("TestJob");
Assert.True(result.IsSuccess, "Job submission should succeed");
}-
Integration Tests: For quick validation of AppHost structure and basic configuration
- Run:
dotnet test --filter "Category=integration_test" - Implementation: Basic service connectivity and configuration validation
- Run:
-
Stress Tests: For high-throughput performance validation with full orchestration
- Run:
dotnet test --filter "Category=stress_test" - Implementation: Process 1M+ messages with performance validation
- Guide: [[Stress Tests Overview|Stress-Tests-Overview]] with complete C# implementations
- Run:
-
Complex Logic Tests: For advanced scenarios with correlation tracking and HTTP processing
- Run:
dotnet test --filter "Category=complex_logic_test" - Implementation: Security tokens, HTTP batching, correlation matching
- Guide: [[Complex Logic Stress Tests|Complex-Logic-Stress-Tests]] with detailed C# mappings
- Run:
-
Backpressure Tests: For rate limiting and flow control validation
- Run:
dotnet test --filter "Category=backpressure_test" - Implementation: Rate limiting, buffer pools, multi-tier enforcement
- Guide: [[Rate Limiting Implementation|Rate-Limiting-Implementation-Tutorial]]
- Run:
# Run all tests
cd Sample/FlinkDotNet.Aspire.IntegrationTests
dotnet test
# Run specific test categories
dotnet test --filter "Category=stress_test"
dotnet test --filter "Category=complex_logic_test"
dotnet test --filter "Category=backpressure_test"
# Run with detailed output
dotnet test --logger "console;verbosity=detailed"- Read BDD Scenarios: Start with
.featurefiles to understand business requirements - Examine Step Definitions: Review corresponding
.csfiles to see C# implementations - Trace to Core Code: Follow step definitions to main FlinkJobBuilder and related classes
- Run and Debug: Execute tests with debugger to see the full flow
For complete examples of how BDD scenarios map to C# implementations, see:
- [[Stress Tests Overview|Stress-Tests-Overview]] - Basic message processing examples
- [[Complex Logic Stress Tests|Complex-Logic-Stress-Tests]] - Advanced integration patterns
- Local Development: Use the [[Deployment Local|Deployment-Local]] environment for interactive development and debugging
External References: