This guide will help you get FlinkDotNet up and running, from installation through your first streaming job.
- .NET 9.0 SDK - Download here
- Docker Desktop or Podman - For local testing environment
- Java 17 - Required for Flink cluster (auto-installed in containers)
# Check .NET version (must be 9.0.x)
dotnet --version
# Check Docker
docker --version
# Install Aspire workload (required on Linux, optional on Windows/macOS)
dotnet workload install aspire- Create a new .NET console application:
dotnet new console -n MyFlinkApp
cd MyFlinkApp- Add FlinkDotNet package:
dotnet add package FlinkDotNet- Write your first streaming job:
using FlinkDotNet.DataStream;
// Get Flink execution environment
var env = Flink.GetExecutionEnvironment();
// Read from Kafka
var orders = env.FromKafka("orders", "kafka:9092", "my-group");
// Transform with Flink operators
var highValueOrders = orders
.Filter(o => o.Amount > 1000)
.Map(o => o.ToUpperInvariant())
.KeyBy(o => o.CustomerId);
// Write back to Kafka
highValueOrders.SinkToKafka("high-value-orders", "kafka:9092");
// Execute the job
await env.ExecuteAsync("HighValueOrderProcessor");
Console.WriteLine("Job submitted successfully!");- Run your application:
dotnet runYour job will be submitted to the configured Flink cluster and start processing data.
Run FlinkJobGateway as a container:
docker pull devstress/flinkdotnet:latest
docker run -p 8086:8086 \
-e FLINK_CLUSTER_HOST=your-flink-host \
-e FLINK_CLUSTER_PORT=8081 \
devstress/flinkdotnet:latestAccess the API at http://localhost:8086.
# Clone the repository
git clone https://github.com/devstress/FlinkDotnet.git
cd FlinkDotnet
# Build the solution
dotnet build FlinkDotNet/FlinkDotNet.sln --configuration Release
# Run integration tests to validate
cd LocalTesting
dotnet test LocalTesting.IntegrationTests --configuration ReleaseExpected result: ✅ 10 integration tests pass - validates complete pipeline.
For local development and testing, use the LocalTesting environment with .NET Aspire:
cd LocalTesting
dotnet run --project LocalTesting.FlinkSqlAppHostThis starts:
- Apache Flink cluster (JobManager + TaskManagers)
- Apache Kafka broker
- Temporal.io server
- FlinkJobGateway service
- Monitoring stack (Prometheus, Grafana, Loki)
Access the services:
- Aspire Dashboard: http://localhost:15000
- Flink Web UI: http://localhost:18002
- Grafana: http://localhost:3000
- Temporal Web UI: http://localhost:8233
cd LocalTesting
dotnet test LocalTesting.IntegrationTests --configuration ReleaseThis validates:
- Kafka producer/consumer with Flink processing
- Basic transformations (map, filter, flatMap)
- Stateful processing (timers, event-time windows)
- Flink SQL via TableEnvironment
- Multi-step pipelines
- Temporal workflow integration
Create appsettings.json in your project:
{
"Flink": {
"JobManagerRestAddress": "http://localhost:18002",
"KafkaConfig": {
"BootstrapServers": "localhost:9092",
"GroupId": "my-consumer-group"
}
}
}Set these for production deployments:
# Flink cluster connection
export FLINK_CLUSTER_HOST=flink-jobmanager
export FLINK_CLUSTER_PORT=8081
# Kafka configuration
export KAFKA_BOOTSTRAP_SERVERS=kafka:9092
export KAFKA_CONSUMER_GROUP=flinkdotnet-group
# Gateway settings
export GATEWAY_PORT=8086Let's build a more complete streaming job:
public record Order(
string OrderId,
string CustomerId,
decimal Amount,
string Region,
DateTime Timestamp
);using FlinkDotNet.DataStream;
public class OrderProcessingJob
{
public static async Task Main(string[] args)
{
// Configure Flink environment
var env = Flink.GetExecutionEnvironment();
env.SetParallelism(4);
env.EnableCheckpointing(TimeSpan.FromSeconds(30));
// Read orders from Kafka
var orders = env.FromKafka<Order>(
topic: "orders",
bootstrapServers: "kafka:9092",
groupId: "order-processor"
);
// Filter high-value orders
var highValueOrders = orders
.Filter(order => order.Amount > 1000);
// Group by region and calculate totals
var regionalTotals = highValueOrders
.KeyBy(order => order.Region)
.Window(TumblingTimeWindow.Of(TimeSpan.FromMinutes(5)))
.Reduce((a, b) => a with { Amount = a.Amount + b.Amount });
// Write results to Kafka
regionalTotals.SinkToKafka("regional-totals", "kafka:9092");
// Execute the job
var jobClient = await env.ExecuteAsync("OrderProcessingJob");
Console.WriteLine($"Job submitted: {jobClient.JobId}");
}
}# Run the job
dotnet run
# Check Flink Web UI
open http://localhost:18002
# View Grafana dashboards
open http://localhost:3000var env = Flink.GetExecutionEnvironment();
var input = env.FromKafka("input", "kafka:9092", "group1");
var output = input
.Filter(msg => msg.IsValid)
.Map(msg => msg.Transform())
.KeyBy(msg => msg.Key);
output.SinkToKafka("output", "kafka:9092");
await env.ExecuteAsync("SimpleTransform");var env = Flink.GetExecutionEnvironment();
var events = env.FromKafka("events", "kafka:9092", "group2");
var aggregated = events
.KeyBy(e => e.UserId)
.Window(TumblingTimeWindow.Of(TimeSpan.FromMinutes(1)))
.Reduce((a, b) => new Event { Count = a.Count + b.Count });
aggregated.SinkToKafka("aggregated", "kafka:9092");
await env.ExecuteAsync("WindowedAgg");var env = Flink.GetExecutionEnvironment();
var orders = env.FromKafka("orders", "kafka:9092", "group3");
var customers = env.FromKafka("customers", "kafka:9092", "group4");
var enriched = orders
.Join(customers)
.Where(order => order.CustomerId)
.EqualTo(customer => customer.Id)
.Window(TumblingTimeWindow.Of(TimeSpan.FromMinutes(1)))
.Apply((order, customer) => EnrichOrder(order, customer));
enriched.SinkToKafka("enriched-orders", "kafka:9092");
await env.ExecuteAsync("OrderEnrichment");Problem: Cannot connect to Flink cluster
Solution:
# Check Flink is running
curl http://localhost:18002/overview
# Verify network connectivity
docker network ls
docker network inspect <network-name>
# Check service logs
docker logs flink-jobmanagerProblem: Job submission fails with validation errors
Solution:
// Enable detailed logging
var env = Flink.GetExecutionEnvironment();
env.SetLogLevel(LogLevel.Debug);
// Validate job before submission
var jobDefinition = env.GetJobDefinition();
var validation = JobDefinitionValidator.Validate(jobDefinition);
if (!validation.IsValid)
{
Console.WriteLine($"Validation errors: {string.Join(", ", validation.Errors)}");
}Problem: Low throughput or high latency
Solution:
// Increase parallelism
env.SetParallelism(8);
// Tune buffer timeout
env.SetBufferTimeout(TimeSpan.FromMilliseconds(100));
// Enable operator chaining
env.EnableOperatorChaining();See Troubleshooting Guide for more solutions.
-
Basics (Day 1-2)
- Quickstart Guide - Hello World examples
- API Reference - Complete API documentation
-
Intermediate (Day 3-7)
- Architecture Guide - System design patterns
- Observability - Monitoring and metrics
- Flink 2.1 Features - Advanced capabilities
-
Advanced (Day 8-14)
- Performance Tuning - Optimization strategies
- Deployment Guide - Production deployment
- Learning Course - 15-day comprehensive training
-
LocalTesting - Complete local development environment
- Location:
LocalTesting/ - Run:
dotnet run --project LocalTesting.FlinkSqlAppHost
- Location:
-
Sample Applications - Reference implementations
- Location:
Sample/ - Run: Follow README in each sample
- Location:
-
Learning Course - Hands-on exercises
- Location:
LearningCourse/ - 15 days of progressive examples
- Location:
- 💬 GitHub Issues - Bug reports and feature requests
- 📧 Discussions - Ask questions and share experiences
- 🌟 Star the repo - Stay updated on releases
- 🤝 Contribute - See CONTRIBUTING.md
Before deploying to production, ensure:
- All integration tests pass locally
- Job successfully submits to Flink cluster
- Monitoring dashboards show expected metrics
- Checkpointing is enabled and working
- Error handling is implemented
- Resource limits are configured
- Security settings are applied
- Documentation is updated
If you encounter issues:
- Check Troubleshooting Guide
- Search GitHub Issues
- Ask in Discussions
- Review Learning Course examples
Ready to build streaming applications? Continue to API Reference for detailed documentation.