Skip to content

Latest commit

 

History

History
340 lines (260 loc) · 11.9 KB

File metadata and controls

340 lines (260 loc) · 11.9 KB

Getting Started with FlinkDotnet

This guide will walk you through setting up the Flink.JobBuilder SDK and deploying Apache Flink infrastructure to run your first streaming job.

Prerequisites

  • .NET SDK: Ensure you have .NET 9.0 SDK or higher installed. You can download it from here.
  • IDE (Optional but Recommended): An IDE like Visual Studio, JetBrains Rider, or VS Code can greatly improve your development experience.
  • Docker and Kubernetes: For deploying Apache Flink cluster infrastructure.
  • Apache Flink Cluster: You'll need either:

1. Create a New .NET Project

Start by creating a new .NET console application. You can do this using the .NET CLI or your preferred IDE.

dotnet new console -n MyFlinkJobApp
cd MyFlinkJobApp

2. Add Flink.JobBuilder NuGet Package

Add the Flink.JobBuilder SDK to your project:

dotnet add package Flink.JobBuilder

3. Write Your First Apache Flink Job

Let's create a simple streaming job that processes data using Apache Flink:

// Program.cs
using Flink.JobBuilder;
using Flink.JobBuilder.Extensions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

public class Program
{
    public static async Task Main(string[] args)
    {
        // 1. Set up dependency injection
        var services = new ServiceCollection();
        services.AddFlinkJobBuilder(config =>
        {
            config.BaseUrl = "http://localhost:8080"; // Flink Job Gateway URL
        });

        var serviceProvider = services.BuildServiceProvider();

        // 2. Create a streaming job with fluent API
        var job = serviceProvider.CreateJobBuilder()
            .FromKafka("orders")
            .Where("Amount > 100")
            .GroupBy("Region")
            .Aggregate("SUM", "Amount")
            .ToKafka("high-value-orders");

        // 3. Submit to Apache Flink cluster
        try
        {
            var result = await job.Submit("OrderProcessingJob");
            Console.WriteLine($"Job submitted successfully!");
            Console.WriteLine($"Flink Job ID: {result.FlinkJobId}");
            Console.WriteLine($"Status: {result.Status}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Failed to submit job: {ex.Message}");
        }
    }
}

4. Deploy Apache Flink Infrastructure

To run your job, you need Apache Flink cluster infrastructure. Choose one of the options:

Option A: Kubernetes Deployment (Recommended)

Deploy the complete Flink.NET ecosystem to Kubernetes:

# Clone the repository
git clone https://github.com/devstress/FlinkDotnet.git
cd FlinkDotnet

# Deploy to Kubernetes
kubectl apply -f k8s/

This deploys:

  • Apache Flink cluster (JobManager + TaskManager)
  • Flink Job Gateway (.NET ASP.NET Core service)
  • Apache Kafka + Zookeeper
  • All necessary services and configuration

Option B: Local Development with Aspire

For local development and testing:

# Build all components
./build-all.sh

# Run with Aspire orchestration
cd Sample/FlinkDotNetAspire.AppHost.AppHost
dotnet run

5. Run Your Application

Once your infrastructure is running:

# Run your .NET application
dotnet run

The application will:

  1. Generate JSON IR from your C# job definition
  2. Submit it to the Flink Job Gateway
  3. The gateway translates it to Apache Flink DataStream jobs
  4. Execute on the Flink cluster

6. Monitor Your Job

Access monitoring interfaces:

# Flink Web UI (if running locally)
kubectl port-forward svc/flink-jobmanager-ui 8081:8081 -n flink-system
# Visit http://localhost:8081

# Job Gateway API
kubectl port-forward svc/flink-job-gateway 8080:8080 -n flink-system
# Visit http://localhost:8080/swagger-ui.html

Next Steps

Troubleshooting

Common Issues

Job submission fails with connection error:

  • Ensure Flink Job Gateway is running and accessible
  • Check the BaseUrl configuration in your application
  • Verify network connectivity to the gateway

Gateway returns validation errors:

  • Check your job definition syntax
  • Ensure all required fields are provided
  • Review the IR generated by your job builder

Infrastructure deployment issues:

  • Verify Kubernetes cluster has sufficient resources
  • Check pod logs: kubectl logs -n flink-system <pod-name>
  • Ensure all images are accessible from your cluster
  • Setting environment variables.
  • A configuration file (e.g., appsettings.json).
  • Programmatic configuration.

Real Configuration Example from Sample/FlinkDotNet.Aspire.AppHost/Program.cs:

{
  "Flink": {
    "JobManagerRestAddress": "http://localhost:8081",
    "JobManagerRpcAddress": "flink-jobmanager",
    "TaskManagerConfig": {
      "NumberOfTaskSlots": 10,
      "MemorySize": "8192m"
    },
    "KafkaConfig": {
      "BootstrapServers": "kafka-kraft-cluster:9092",
      "GroupId": "flink-dotnet-consumer-group",
      "AutoOffsetReset": "earliest"
    },
    "RedisConnectionString": "localhost:6379",
    "Backpressure": {
      "MaxConsumerLag": 10000,
      "ScalingThreshold": 5000,
      "MonitoringInterval": "00:00:05",
      "QuotaEnforcement": "Per-client, per-IP",
      "DynamicRebalancing": true
    }
  }
}

This configuration is based on the real Aspire AppHost setup used in BDD testing scenarios, with actual container configuration values from the working test infrastructure.

(Refer to Flink.NET's specific documentation for details on how to configure the job submission and connection to Flink.)

5. Run Your Application

Once your code is ready and the Flink connection is configured:

  1. Ensure your stream processing cluster is running. You can typically start it by navigating to your Flink installation's bin directory and running ./start-cluster.sh (on Linux/macOS) or start-cluster.bat (on Windows).

  2. Run your .NET application:

    dotnet run

You should see the odd numbers (1, 3, 5, 7, 9) printed in the console output from your Flink job. You can also monitor the job through the Flink Web UI (usually at http://localhost:8081).

Next Steps

  • Explore different [[Operators|Developing-Operators]] available in Flink.NET.
  • Learn about [[Connectors|Connectors-Overview]] to read from and write to external systems.
  • Understand [[State Management|Core-Concepts-State-Management-Overview]] for building stateful applications.

Testing Your Applications

Flink.NET provides comprehensive testing capabilities with both BDD specifications and C# implementations:

🧪 BDD Testing Framework

Our testing follows Behavior-Driven Development (BDD) patterns using SpecFlow/Reqnroll:

📁 Test Structure:

🔄 Example: BDD to C# Mapping

BDD Scenario Example:

# From StressTest.feature
Scenario: Basic Job Submission Test
  Given the Flink cluster is running
  When I create a simple streaming job:
    | Source | Kafka topic "test-input" |
    | Transform | Map each message to uppercase |
    | Sink | Kafka topic "test-output" |
  Then the job should be submitted successfully

Corresponding C# Implementation:

// From StressTestStepDefinitions.cs
[Given(@"the Flink cluster is running")]
public void GivenTheFlinkClusterIsRunning()
{
    var clusterHealthy = ValidateFlinkCluster();
    Assert.True(clusterHealthy, "Flink cluster should be running and healthy");
}

[When(@"I create a simple streaming job:")]
public void WhenICreateASimpleStreamingJob(Table table)
{
    // Uses the main FlinkJobBuilder API:
    _jobBuilder = FlinkJobBuilder
        .FromKafka("test-input")
        .Map("message => message.toUpperCase()")
        .ToKafka("test-output");
        
    _jobDefinition = _jobBuilder.BuildJobDefinition();
}

[Then(@"the job should be submitted successfully")]
public async Task ThenTheJobShouldBeSubmittedSuccessfully()
{
    var result = await _jobBuilder.Submit("TestJob");
    Assert.True(result.IsSuccess, "Job submission should succeed");
}

🎯 Test Categories

  • Integration Tests: For quick validation of AppHost structure and basic configuration

    • Run: dotnet test --filter "Category=integration_test"
    • Implementation: Basic service connectivity and configuration validation
  • Stress Tests: For high-throughput performance validation with full orchestration

    • Run: dotnet test --filter "Category=stress_test"
    • Implementation: Process 1M+ messages with performance validation
    • Guide: [[Stress Tests Overview|Stress-Tests-Overview]] with complete C# implementations
  • Complex Logic Tests: For advanced scenarios with correlation tracking and HTTP processing

    • Run: dotnet test --filter "Category=complex_logic_test"
    • Implementation: Security tokens, HTTP batching, correlation matching
    • Guide: [[Complex Logic Stress Tests|Complex-Logic-Stress-Tests]] with detailed C# mappings
  • Backpressure Tests: For rate limiting and flow control validation

    • Run: dotnet test --filter "Category=backpressure_test"
    • Implementation: Rate limiting, buffer pools, multi-tier enforcement
    • Guide: [[Rate Limiting Implementation|Rate-Limiting-Implementation-Tutorial]]

🚀 Running Tests

# Run all tests
cd Sample/FlinkDotNet.Aspire.IntegrationTests
dotnet test

# Run specific test categories
dotnet test --filter "Category=stress_test"
dotnet test --filter "Category=complex_logic_test"
dotnet test --filter "Category=backpressure_test"

# Run with detailed output
dotnet test --logger "console;verbosity=detailed"

🏗️ Understanding Test Implementation

  1. Read BDD Scenarios: Start with .feature files to understand business requirements
  2. Examine Step Definitions: Review corresponding .cs files to see C# implementations
  3. Trace to Core Code: Follow step definitions to main FlinkJobBuilder and related classes
  4. Run and Debug: Execute tests with debugger to see the full flow

For complete examples of how BDD scenarios map to C# implementations, see:

  • [[Stress Tests Overview|Stress-Tests-Overview]] - Basic message processing examples
  • [[Complex Logic Stress Tests|Complex-Logic-Stress-Tests]] - Advanced integration patterns
  • Local Development: Use the [[Deployment Local|Deployment-Local]] environment for interactive development and debugging

External References: