Skip to content

utkarsh5026/chankit

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

69 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

chankit logo

chankit

Elegant, Type-Safe Channel Operations for Go

Transform the way you work with Go channels using modern, composable pipelines

Go Report Card GoDoc License: MIT Documentation

Documentation β€’ Features β€’ Installation β€’ Quick Start β€’ Examples β€’ API Reference


🎯 What is chankit?

chankit is a powerful Go library that brings functional programming patterns to Go channels. Built with generics for complete type safety, it transforms verbose channel operations into elegant, readable pipelines.

From This...

// Traditional Go: Verbose, nested, hard to maintain
ch := make(chan int)
go func() {
    for i := 1; i <= 100; i++ {
        ch <- i
    }
    close(ch)
}()

ch2 := make(chan int)
go func() {
    defer close(ch2)
    for v := range ch {
        ch2 <- v * v
    }
}()

result := []int{}
for v := range ch2 {
    if v%2 == 0 {
        result = append(result, v)
    }
}

...To This!

// chankit: Clean, expressive, maintainable
result := chankit.RangePipeline(ctx, 1, 101, 1).
    Map(func(x int) any { return x * x }).
    Filter(func(x any) bool { return x.(int)%2 == 0 }).
    ToSlice()

✨ Features

πŸŽ›οΈ Flow Control

Control the flow of data with powerful timing operators

  • Throttle - Rate limiting with value dropping
  • Debounce - Wait for silence before emitting
  • Batch - Group values by size or timeout
  • FixedInterval - Consistent value pacing

πŸ”„ Transformations

Transform and process data functionally

  • Map - Transform values with type safety
  • Filter - Select values by predicate
  • Reduce - Aggregate to single result
  • FlatMap - Transform and flatten

🏭 Generators

Create channels from various sources

  • Range - Numeric sequences
  • Generate - Custom generator functions
  • Repeat - Infinite value repetition
  • FromSlice - Convert slices to channels

⚑ Production Ready

Built for real-world use

  • Context-Aware - Respects cancellation
  • Type-Safe - Full generic support
  • Well-Tested - Comprehensive coverage
  • Pipeline API - Modern fluent interface

πŸ“¦ Installation

go get github.com/utkarsh5026/chankit
import "github.com/utkarsh5026/chankit/chankit"

πŸš€ Quick Start

The Pipeline Pattern

package main

import (
    "context"
    "fmt"
    "github.com/utkarsh5026/chankit/chankit"
)

func main() {
    ctx := context.Background()

    // Create a pipeline: numbers 1-100, square them, keep evens, take first 10
    result := chankit.RangePipeline(ctx, 1, 101, 1).
        Map(func(x int) any { return x * x }).
        Filter(func(x any) bool { return x.(int)%2 == 0 }).
        Take(10).
        ToSlice()

    fmt.Println(result)
    // Output: [4 16 36 64 100 144 196 256 324 400]
}

πŸ’‘ Real-World Examples

πŸ“Š Example 1: Data Processing Pipeline

Scenario: Process a stream of numbers - square them, filter for even results, skip the first 10, and take the next 20.

package main

import (
    "context"
    "fmt"
    "github.com/utkarsh5026/chankit/chankit"
)

func main() {
    ctx := context.Background()

    // Process numbers 1-100
    result := chankit.RangePipeline(ctx, 1, 101, 1).
        Map(func(x int) any { return x * x }).           // Square each number
        Filter(func(x any) bool {                         // Keep only even squares
            return x.(int)%2 == 0
        }).
        Skip(10).                                         // Skip first 10 results
        Take(20).                                         // Take next 20
        ToSlice()                                         // Collect to slice

    fmt.Printf("Processed %d values: %v\n", len(result), result)
}

Output:

Processed 20 values: [484 576 676 784 900 1024 1156 1296 1444 1600 1764 1936 2116 2304 2500 2704 2916 3136 3364 3600]

πŸ”„ Example 2: Event Stream Processing

Scenario: Build a real-time event processing system that debounces user input, filters events, and batches them for processing.

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/utkarsh5026/chankit/chankit"
)

type Event struct {
    Type      string
    Timestamp time.Time
    Data      string
}

func main() {
    ctx := context.Background()

    // Simulate event stream
    events := make(chan Event, 100)
    go func() {
        defer close(events)
        for i := 0; i < 50; i++ {
            events <- Event{
                Type:      "click",
                Timestamp: time.Now(),
                Data:      fmt.Sprintf("event-%d", i),
            }
            time.Sleep(10 * time.Millisecond)
        }
    }()

    // Process events: debounce, filter, and batch
    batches := chankit.From(ctx, events).
        Debounce(50 * time.Millisecond).                  // Wait for 50ms silence
        Filter(func(e Event) bool {                       // Filter specific events
            return e.Type == "click"
        }).
        Batch(5, 200*time.Millisecond)                    // Batch 5 or every 200ms

    // Process batches
    for batch := range batches {
        fmt.Printf("Processing batch of %d events\n", len(batch))
    }
}

πŸ”€ Example 3: Data Transformation Stream

Scenario: Transform user data by filtering active users, extracting emails, and processing in batches.

package main

import (
    "context"
    "fmt"
    "strings"
    "time"
    "github.com/utkarsh5026/chankit/chankit"
)

type User struct {
    ID     int
    Name   string
    Email  string
    Active bool
}

func main() {
    ctx := context.Background()

    // Sample users
    users := []User{
        {1, "Alice", "[email protected]", true},
        {2, "Bob", "[email protected]", false},
        {3, "Charlie", "[email protected]", true},
        {4, "Diana", "[email protected]", true},
        {5, "Eve", "[email protected]", false},
    }

    // Transform: active users β†’ emails β†’ uppercase β†’ batch
    batches := chankit.FromSlice(ctx, users).
        Filter(func(u User) bool { return u.Active }).    // Active users only
        Map(func(u User) any { return u.Email }).         // Extract emails
        Map(func(e any) any {                             // Uppercase emails
            return strings.ToUpper(e.(string))
        }).
        Batch(2, 100*time.Millisecond)                    // Batch for processing

    // Process email batches
    for batch := range batches {
        fmt.Printf("Email batch: %v\n", batch)
    }
    // Output:
    // Email batch: [[email protected] [email protected]]
    // Email batch: [[email protected]]
}

πŸ“ˆ Example 4: Real-Time Sensor Data Analytics

Scenario: Process high-frequency sensor readings, debounce them to reduce noise, transform values, and calculate running averages.

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"
    "github.com/utkarsh5026/chankit/chankit"
)

type SensorReading struct {
    SensorID  string
    Value     float64
    Timestamp time.Time
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel()

    // Simulate sensor data stream
    readings := make(chan SensorReading, 100)
    go func() {
        defer close(readings)
        for {
            select {
            case <-ctx.Done():
                return
            case readings <- SensorReading{
                SensorID:  "temp-01",
                Value:     20 + rand.Float64()*10,
                Timestamp: time.Now(),
            }:
                time.Sleep(10 * time.Millisecond)
            }
        }
    }()

    // Process: debounce β†’ transform to Celsius β†’ calculate average
    processed := chankit.From(ctx, readings).
        Debounce(100 * time.Millisecond).                 // Reduce noise
        Map(func(r SensorReading) any {                   // Transform value
            return r.Value * 1.5 // Simulate transformation
        }).
        Take(10)                                          // Take first 10

    // Calculate average
    sum := 0.0
    count := 0
    processed.ForEach(func(v any) {
        sum += v.(float64)
        count++
        fmt.Printf("Reading #%d: %.2f\n", count, v.(float64))
    })

    if count > 0 {
        fmt.Printf("\nAverage: %.2f\n", sum/float64(count))
    }
}

πŸ” Example 5: Search-As-You-Type with Debouncing

Scenario: Implement an efficient search system that waits for the user to stop typing before executing the search.

package main

import (
    "context"
    "fmt"
    "strings"
    "time"
    "github.com/utkarsh5026/chankit/chankit"
)

func performSearch(query string) []string {
    // Simulate search results
    database := []string{"apple", "application", "apply", "banana", "band", "bandana"}
    results := []string{}
    for _, item := range database {
        if strings.Contains(strings.ToLower(item), strings.ToLower(query)) {
            results = append(results, item)
        }
    }
    return results
}

func main() {
    ctx := context.Background()

    // Simulate user typing
    userInput := make(chan string, 10)
    go func() {
        defer close(userInput)
        queries := []string{"a", "ap", "app", "appl", "apple"}
        for _, q := range queries {
            userInput <- q
            time.Sleep(50 * time.Millisecond)
        }
    }()

    // Debounce and search
    results := chankit.From(ctx, userInput).
        Debounce(200 * time.Millisecond).                 // Wait for typing to stop
        Filter(func(q string) bool { return len(q) >= 2 }).// Min 2 chars
        Map(func(q string) any {                          // Perform search
            fmt.Printf("Searching for: %s\n", q)
            return performSearch(q)
        })

    // Display results
    results.ForEach(func(r any) {
        fmt.Printf("Results: %v\n", r)
    })
}

⚑ Example 6: Rate-Limited API Calls

Scenario: Make API calls at a controlled rate to avoid hitting rate limits.

package main

import (
    "context"
    "fmt"
    "time"
    "github.com/utkarsh5026/chankit/chankit"
)

type APIRequest struct {
    ID       int
    Endpoint string
}

func callAPI(req APIRequest) string {
    // Simulate API call
    return fmt.Sprintf("Response for request %d", req.ID)
}

func main() {
    ctx := context.Background()

    // Create 20 API requests
    requests := make([]APIRequest, 20)
    for i := 0; i < 20; i++ {
        requests[i] = APIRequest{
            ID:       i + 1,
            Endpoint: "/api/data",
        }
    }

    // Process at most 10 requests per second (100ms interval)
    fmt.Println("Making rate-limited API calls...")
    start := time.Now()

    chankit.FromSlice(ctx, requests).
        FixedInterval(100 * time.Millisecond).            // 10 per second max
        Tap(func(req APIRequest) {                        // Log progress
            fmt.Printf("[%s] Processing request %d\n",
                time.Since(start).Round(time.Millisecond), req.ID)
        }).
        Map(func(req APIRequest) any {                    // Make API call
            return callAPI(req)
        }).
        ForEach(func(resp any) {                          // Handle responses
            // Process response
        })

    fmt.Printf("Completed in %s\n", time.Since(start).Round(time.Millisecond))
}

πŸ“š API Reference

πŸ”¨ Creating Pipelines

Method Description Example
RangePipeline(ctx, start, end, step) Create from numeric range RangePipeline(ctx, 1, 100, 1)
FromSlice(ctx, slice) Create from slice FromSlice(ctx, []int{1,2,3})
From(ctx, channel) Create from existing channel From(ctx, myChan)
NewPipeline[T](ctx) Create empty pipeline NewPipeline[int](ctx)

πŸ”„ Transformation Methods

Method Description Example
Map(fn) Transform each value .Map(func(x int) any { return x * 2 })
MapTo[R](fn) Type-safe map MapTo(p, func(x int) string { ... })
Filter(fn) Keep matching values .Filter(func(x int) bool { return x > 10 })
FlatMap(fn) Transform and flatten .FlatMap(func(x int) <-chan int { ... })

🎯 Selection Methods

Method Description Example
Take(n) Take first N values .Take(10)
Skip(n) Skip first N values .Skip(5)
TakeWhile(fn) Take while predicate true .TakeWhile(func(x int) bool { return x < 100 })
SkipWhile(fn) Skip while predicate true .SkipWhile(func(x int) bool { return x < 0 })
First() Get first value value, ok := pipeline.First()
Last() Get last value value, ok := pipeline.Last()

πŸŽ›οΈ Flow Control Methods

Method Description Use Case Example
Throttle(duration) Emit at fixed intervals, drop extras UI updates, high-frequency events .Throttle(100*time.Millisecond)
Debounce(duration) Wait for silence before emitting Search boxes, form validation .Debounce(300*time.Millisecond)
FixedInterval(duration) Pace values without dropping Rate-limited API calls .FixedInterval(100*time.Millisecond)
Batch(size, timeout) Group into batches Bulk database inserts .Batch(100, 5*time.Second)

πŸ”€ Combining Pipelines

Method Description Example
Merge(channels...) Combine multiple channels p1.Merge(p2.Chan(), p3.Chan())
ZipWith(other) Pair values from two channels ZipWith(p1, p2.Chan())

🏁 Terminal Operations

Method Description Blocking Example
ToSlice() Collect all values Yes result := pipeline.ToSlice()
Reduce(fn, initial) Aggregate values Yes .Reduce(func(a,b int) int { return a+b }, 0)
Count() Count values Yes count := pipeline.Count()
ForEach(fn) Execute for each Yes .ForEach(func(x int) { fmt.Println(x) })
Any(fn) Check if any match Yes .Any(func(x int) bool { return x > 10 })
All(fn) Check if all match Yes .All(func(x int) bool { return x > 0 })
Chan() Get underlying channel No ch := pipeline.Chan()

πŸ‘οΈ Observation Methods

Method Description Example
Tap(fn) Observe without modifying .Tap(func(x int) { log.Println(x) })

🎨 Advanced Patterns

Pattern 1: Complex Data Pipeline

// Multi-stage data processing with logging
result := chankit.RangePipeline(ctx, 1, 1000, 1).
    Tap(func(x int) { log.Printf("Input: %d", x) }).
    Filter(func(x int) bool { return x%2 == 0 }).
    Tap(func(x int) { log.Printf("After filter: %d", x) }).
    Map(func(x int) any { return x * x }).
    Skip(10).
    Take(20).
    Tap(func(x any) { log.Printf("Final: %v", x) }).
    ToSlice()

Pattern 2: Fan-Out/Fan-In

// Process data through multiple parallel pipelines
ctx := context.Background()
source := chankit.RangePipeline(ctx, 1, 100, 1)

// Fan-out: multiple processing pipelines
evens := source.Filter(func(x int) bool { return x%2 == 0 })
odds := source.Filter(func(x int) bool { return x%2 != 0 })

// Fan-in: merge results
merged := evens.Merge(odds.Chan())
result := merged.ToSlice()

Pattern 3: Conditional Processing

// Different processing based on value
result := chankit.FromSlice(ctx, data).
    Map(func(x int) any {
        if x > 100 {
            return x * 2
        } else if x > 50 {
            return x * 1.5
        }
        return x
    }).
    ToSlice()

πŸ›‘ Context Cancellation

All operations respect context cancellation for graceful shutdowns:

// Timeout after 5 seconds
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

result := chankit.RangePipeline(ctx, 1, 1000000, 1).
    Map(func(x int) any { return expensiveOperation(x) }).
    ToSlice() // Stops after 5 seconds

// Manual cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
    time.Sleep(1 * time.Second)
    cancel() // Stop all operations
}()

pipeline := chankit.From(ctx, channel).
    Map(func(x int) any { return process(x) })

βš™οΈ Configuration & Performance

Buffering

Control channel buffer sizes for better performance:

// Custom buffer size for high-throughput
ch := chankit.Map(ctx, input, fn, chankit.WithBuffer[int](100))

// Auto-sized buffer for SliceToChan
ch := chankit.SliceToChan(ctx, slice, chankit.WithBufferAuto[int]())

Pre-allocation

Optimize memory when collecting:

// Pre-allocate if you know the size
slice := chankit.ChanToSlice(ctx, ch, chankit.WithCapacity[int](1000))

Performance Tips

  1. Use buffered channels for high-throughput scenarios
  2. Pre-allocate slices when you know the expected size
  3. Choose the right flow control:
    • Throttle - Drop values for rate limiting
    • Debounce - Wait for activity to stop
    • FixedInterval - Preserve all values
    • Batch - Group for bulk operations

πŸ§ͺ Testing

# Run all tests
go test ./...

# Run with coverage
go test -cover ./...

# Run specific test
go test -v -run TestThrottle ./chankit

# Benchmark
go test -bench=. ./chankit

πŸ“Š Why chankit?

❌ Standard Library

// Verbose, error-prone
ch := make(chan int)
go func() {
    defer close(ch)
    for i := 0; i < 10; i++ {
        ch <- i * 2
    }
}()

result := []int{}
for v := range ch {
    if v%4 == 0 {
        result = append(result, v)
    }
}

Issues:

  • Manual goroutine management
  • Error-prone close handling
  • Hard to compose operations
  • No context support built-in
  • Verbose and repetitive

βœ… chankit

// Clean, declarative
result := chankit.RangePipeline(ctx, 0, 10, 1).
    Map(func(x int) any { return x * 2 }).
    Filter(func(x any) bool {
        return x.(int)%4 == 0
    }).
    ToSlice()

Advantages:

  • Declarative, readable code
  • Automatic resource management
  • Easy composition and chaining
  • Built-in context support
  • Type-safe with generics
  • Production-tested patterns

Key Benefits

  • Higher-level abstractions - Map, Filter, Reduce for functional programming
  • Flow control utilities - Throttle, Debounce, Batch for common patterns
  • Type-safe generics - Compile-time type checking
  • Composability - Easy chaining for complex pipelines
  • Production-tested - Comprehensive test coverage

🀝 Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

πŸ“„ License

MIT License - see LICENSE file for details.


Made with ❀️ for the Go community

⭐ Star this repo if you find it useful!


chankit - Transform channels, elevate code