How do I use the Java Util Concurrent Flow API?

The java.util.concurrent.Flow API, introduced in Java 9, is a low-level implementation of the Reactive Streams specification, providing an asynchronous, non-blocking framework for handling streams of data. It allows you to build reactive systems that support backpressure to handle large or variable amounts of data more efficiently.

Here’s a quick guide on how to use java.util.concurrent.Flow API:


Key Interfaces in java.util.concurrent.Flow

The API comprises four core interfaces:

  1. Flow.Publisher:
    Represents the producer of data. It publishes items to one or more subscribers.

  2. Flow.Subscriber:
    Represents the consumer of data. It subscribes to a publisher to receive data.

  3. Flow.Subscription:
    Represents a link between a publisher and a subscriber, allowing the subscriber to control how much data it receives (backpressure).

  4. Flow.Processor:
    Both a subscriber and a publisher, used to transform or process elements as they flow through the stream.


Implementation Workflow

To use the Flow API, you need to implement these interfaces. Below is a step-by-step explanation:

1. Create a Publisher

  • The Flow.Publisher interface has a single method subscribe(Subscriber<? super T> subscriber).
  • The publisher is responsible for connecting with subscribers and managing their subscriptions.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimplePublisher implements Flow.Publisher<String> {
    private final String[] items = {"Item 1", "Item 2", "Item 3"};

    @Override
    public void subscribe(Flow.Subscriber<? super String> subscriber) {
        SubscriptionImpl subscription = new SubscriptionImpl(subscriber, items);
        subscriber.onSubscribe(subscription);
    }

    private static class SubscriptionImpl implements Flow.Subscription {
        private final Flow.Subscriber<? super String> subscriber;
        private final String[] items;
        private int currentIndex = 0;
        private boolean canceled = false;

        public SubscriptionImpl(Flow.Subscriber<? super String> subscriber, String[] items) {
            this.subscriber = subscriber;
            this.items = items;
        }

        @Override
        public void request(long n) {
            if (n <= 0) {
                subscriber.onError(new IllegalArgumentException("Must request a positive number of items"));
                return;
            }

            for (int i = 0; i < n && currentIndex < items.length; i++) {
                if (canceled) {
                    return;
                }
                subscriber.onNext(items[currentIndex++]);
            }

            if (currentIndex == items.length) {
                subscriber.onComplete();
            }
        }

        @Override
        public void cancel() {
            canceled = true;
        }
    }
}

2. Create a Subscriber

  • Implement the Flow.Subscriber interface (four methods) for receiving events from a publisher:
    • onSubscribe(Flow.Subscription subscription): Receive the subscription. You must request data here.
    • onNext(T item): Handle the next item of the stream.
    • onError(Throwable throwable): Handle any errors.
    • onComplete(): Called when the publisher finishes sending data.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;

public class SimpleSubscriber implements Flow.Subscriber<String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        System.out.println("Subscribed!");
        subscription.request(1); // Request the first item
    }

    @Override
    public void onNext(String item) {
        System.out.println("Received: " + item);
        subscription.request(1); // Request the next item
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println("Error: " + throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Complete!");
    }
}

3. Connect the Publisher to the Subscriber

  • Instantiate and link your Publisher and Subscriber.
package org.kodejava.util.concurrent;

public class FlowExample {
    public static void main(String[] args) {
        SimplePublisher publisher = new SimplePublisher();
        SimpleSubscriber subscriber = new SimpleSubscriber();

        publisher.subscribe(subscriber);
    }
}

4. (Optional) Create a Processor

  • A Processor acts as both a Subscriber to transform data from an upstream publisher and a Publisher to pass it downstream.
package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class UppercaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        submit(item.toUpperCase());
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        throwable.printStackTrace();
        close();
    }

    @Override
    public void onComplete() {
        System.out.println("Processing complete!");
        close();
    }
}

Example with SubmissionPublisher

Java also provides a SubmissionPublisher class, an implementation of Flow.Publisher, which simplifies creating Publishers.

package org.kodejava.util.concurrent;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class SubmissionPublisherExample {
    public static void main(String[] args) throws InterruptedException {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();

        Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
            private Flow.Subscription subscription;

            @Override
            public void onSubscribe(Flow.Subscription subscription) {
                this.subscription = subscription;
                System.out.println("Subscribed!");
                subscription.request(1);
            }

            @Override
            public void onNext(String item) {
                System.out.println("Received: " + item);
                subscription.request(1);
            }

            @Override
            public void onError(Throwable throwable) {
                throwable.printStackTrace();
            }

            @Override
            public void onComplete() {
                System.out.println("Done!");
            }
        };

        publisher.subscribe(subscriber);

        System.out.println("Publishing data...");
        publisher.submit("Hello");
        publisher.submit("World");
        publisher.submit("!");

        Thread.sleep(100); // Allow time for processing
        publisher.close();
    }
}

Output of the Above Example

Publishing data...
Subscribed!
Received: Hello
Received: World
Received: !
Done!

Points to Remember

  1. Backpressure:
    • The subscriber can control how many items it wants to receive using the request() method of Flow.Subscription.
    • If the subscriber requests fewer items, the publisher will slow down and send only the requested number.
  2. Error Handling:
    • If something goes wrong, the onError() callback is invoked, allowing you to handle errors gracefully.
  3. Completion:
    • Once all elements are processed, the publisher calls onComplete() to indicate the sequence is finished.
  4. Threading:
    • The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the SubmissionPublisher uses a default ForkJoinPool to process items).

By combining custom implementations with the provided SubmissionPublisher and additional libraries, you can build reactive systems that are both powerful and resource-efficient.

How do I use Collectors.filtering() introduced in Java 9?

In Java 9, the Collectors.filtering method was introduced to the Stream API as part of java.util.stream.Collectors. It allows you to apply a filter to elements of a stream before collecting them into a downstream collector (e.g., toList, toSet, etc.).

This can be particularly useful when you want to filter elements as part of the data collection pipeline.


Syntax

static <T, A, R> Collector<T, ?, R> filtering(Predicate<? super T> predicate, Collector<? super T, A, R> downstream)
  • predicate: A filter condition to be applied (e.g., a lambda expression).
  • downstream: The collector that will gather the filtered elements (e.g., Collectors.toList()).

How It Works

  1. The filtering method applies the specified Predicate to filter the elements of the stream.
  2. Only the elements that match the predicate are passed to the downstream collector.
  3. The filtered results are then collected as specified by the downstream collector.

Usage Example

Here’s a basic example of using Collectors.filtering:

Collecting only even integers from a list:

package org.kodejava.util.stream;

import java.util.List;
import java.util.stream.Collectors;

public class FilteringExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

        // Apply filtering before collecting to a list
        List<Integer> evenNumbers = numbers.stream()
                .collect(Collectors.filtering(n -> n % 2 == 0, Collectors.toList()));

        System.out.println("Even Numbers: " + evenNumbers);
    }
}

Output:

Even Numbers: [2, 4, 6, 8, 10]

Filtering with Downstream Grouping

You can use filtering in more complex collectors, such as those involving grouping. For example:

Grouping strings by their first character and filtering only strings longer than 3 characters:

package org.kodejava.util.stream;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class FilteringWithGrouping {
    public static void main(String[] args) {
        List<String> words = List.of("apple", "ant", "banana", "bat", "cat", "car", "dog");

        // Group by the first character and filter words with length > 3
        Map<Character, List<String>> filteredWordsByGroup = words.stream()
                .collect(Collectors.groupingBy(
                        word -> word.charAt(0), // Grouping by the first character
                        Collectors.filtering(
                                word -> word.length() > 3, // Filter words with length > 3
                                Collectors.toList() // Collect filtered words into a list
                        )
                ));

        System.out.println("Filtered Words: " + filteredWordsByGroup);
    }
}

Output:

Filtered Words: {a=[apple], b=[banana], c=[cat, car], d=[dog]}

When to Use

Collectors.filtering is particularly useful for:

  1. Grouped collections: Applying a filter while grouping elements.
  2. Custom collections: Collecting filtered elements into different collection types without needing an intermediate filtered stream.
  3. Improved readability: Reduces the need for chaining multiple Stream.filter() calls in complex data processing.

Overall, Collectors.filtering makes streams more flexible and concise for advanced data collection scenarios!

How do I use Stream.ofNullable() for Optional Streams?

The Stream.ofNullable method in Java is a utility introduced in Java 9. It is used to create a stream from an object that may or may not be null. This is especially useful when dealing with optional values where you want to avoid manually checking if a value is null before creating a stream.

Here’s how Stream.ofNullable works:

  1. If the passed object is not null, it creates a stream containing that single element.
  2. If the passed object is null, it creates an empty stream.

This is particularly effective when you need to safely process nullable values in a stream pipeline without additional null checks.

Syntax:

Stream.ofNullable(T t)

Parameters:

  • t: The object that you want to create a stream from (nullable).

Returns:

  • A stream consisting of the specified element if it is non-null.
  • An empty stream if the element is null.

Example Usage:

Basic Example

package org.kodejava.util.stream;

import java.util.stream.Stream;

public class StreamOfNullableExample {
    public static void main(String[] args) {
        String value = "Hello, World!";
        Stream<String> stream1 = Stream.ofNullable(value);
        stream1.forEach(System.out::println); // Outputs: Hello, World!

        String nullValue = null;
        Stream<String> stream2 = Stream.ofNullable(nullValue);
        stream2.forEach(System.out::println); // Outputs nothing (empty stream)
    }
}

Combining with Other Stream Operations

package org.kodejava.util.stream;

import java.util.stream.Collectors;
import java.util.List;
import java.util.stream.Stream;

public class OptionalStreamExample {
    public static void main(String[] args) {
        String[] values = { "one", null, "three", null };

        // Collect all non-null values into a list
        List<String> nonNullValues = Stream.of(values)
            .flatMap(Stream::ofNullable) // Process each value safely, handling nulls
            .collect(Collectors.toList());

        System.out.println(nonNullValues); // Outputs: [one, three]
    }
}

Practical Example with Optional

When dealing with Optional values, you can use Stream.ofNullable to easily integrate with other streams.

package org.kodejava.util.stream;

import java.util.Optional;
import java.util.stream.Stream;

public class OptionalToStreamExample {
    public static void main(String[] args) {
        Optional<String> optionalValue = Optional.of("Hello, Optional!");

        // Convert Optional to Stream and process
        Stream<String> stream = Stream.ofNullable(optionalValue.orElse(null));
        stream.forEach(System.out::println); // Outputs: Hello, Optional!
    }
}

Key Highlights of Stream.ofNullable:

  • Avoids the need for null checks when creating streams for nullable values.
  • Simplifies stream pipelines where null handling is required.
  • Works well with flatMap to filter null values while processing arrays, collections, or optionals.

By using Stream.ofNullable, you can write cleaner, safer, and more concise code when dealing with nullable values in streams.

How do I use Collectors.flatMapping()?

Collectors.flatMapping is a utility method in the java.util.stream.Collectors class (introduced in Java 9) that combines the concepts of flattening a collection of collections and mapping elements into a single flattened stream of results.

Definition

Collectors.flatMapping allows you to apply a mapping function to elements of a stream and simultaneously flatten the resulting streams (or collections) into a single collection.

The syntax looks like this:

static <T, U, A, R> Collector<T, ?, R> flatMapping(Function<? super T, ? extends Stream<? extends U>> mapper,
                                                   Collector<? super U, A, R> downstream)

Parameters:

  1. mapper: A function applied to each element of the stream to produce a sub-stream (or child elements).
  2. downstream: A collector used to collect the flattened elements produced by the mapper.

Key Use-Cases:

  • Flattening hierarchical data like lists of lists.
  • Transforming and collecting elements into a single, flattened collection.

Behavior

  1. Applies a mapping function to transform each element of the data set into a Stream (or subcollection).
  2. Flattens these streams into a single continuous stream.
  3. Uses the provided downstream collector to collect the flattened results.

Example Explanation

Suppose you have a Map of students and their enrolled subjects:

Map<String, List<String>> studentSubjects = Map.of(
    "Alice", List.of("Math", "Physics"),
    "Bob", List.of("Biology", "Chemistry"),
    "Charlie", List.of("Math", "History")
);

If you want to collect all the subjects in a flattened set (with no duplicates):

Set<String> allSubjects = studentSubjects.values().stream()
    .collect(Collectors.flatMapping(List::stream, Collectors.toSet()));

System.out.println(allSubjects); 
// Output: [Biology, Physics, History, Math, Chemistry]

Detailed Breakdown:

  1. Input: A Stream<List<String>> (from studentSubjects.values()).
  2. flatMapping:
    • It applies List::stream (mapping each List<String> into a Stream<String>).
    • Then flattens these child streams into a single stream of subjects.
  3. toSet: Collects the flattened stream into a Set (no duplicates allowed).

Comparing flatMapping to map

  • map transforms each element into a sub-stream or collection but does not flatten.
  • flatMapping combines both mapping and flattening into one step, which simplifies working with nested structures.

Example differences:

List<List<String>> lists = List.of(
    List.of("a", "b", "c"),
    List.of("d", "e"),
    List.of("f", "g", "h")
);

// Using flatMapping
Set<String> flatCollection = lists.stream()
    .collect(Collectors.flatMapping(List::stream, Collectors.toSet()));

// Output: [a, b, c, d, e, f, g, h]

// Using map (no flattening)
List<Stream<String>> mappedStreams = lists.stream()
    .map(List::stream)
    .collect(Collectors.toList());

Custom Use Case and Comparisons

In your context (flatMap in CustomMonad), consider how Collectors.flatMapping can achieve similar goals in a Java Stream pipeline.

Example with nested collections:

class CustomMonadExample {

    public static void main(String[] args) {
        List<Optional<Integer>> optionalNumbers = List.of(
            Optional.of(1),
            Optional.of(2),
            Optional.empty(),
            Optional.of(4)
        );

        // Stream + flatMapping
        List<Integer> flatMappedNumbers = optionalNumbers.stream()
            .collect(Collectors.flatMapping(opt -> opt.stream(), Collectors.toList()));

        System.out.println(flatMappedNumbers); // Output: [1, 2, 4]
    }
}

Here:

  • Each Optional is mapped to its stream using opt.stream().
  • Then these streams are combined (flattened) into a list using flatMapping.

Keynotes:

  • Use Collectors.flatMapping when your processing involves nested structures or sub-streams, and you need to combine everything into a single collection.
  • It complements functionality such as flatMap for streams but applies for collecting results directly.

How do I use Stream.takeWhile() and Stream.dropWhile()?

In Java, the Stream.takeWhile and Stream.dropWhile methods are introduced in Java 9. These operations allow you to process a stream conditionally based on a predicate, controlling how many elements to take or discard from the stream.

Here’s how they work:

Stream.takeWhile(predicate)

  • Operation: This method takes elements from the stream as long as the given predicate evaluates to true. It stops processing as soon as the predicate evaluates to false, even if there are more elements in the stream.
  • Key Point: It works on a lazily-evaluated stream and stops as soon as the predicate fails.

Example:

package org.kodejava.util.stream;

import java.util.List;
import java.util.stream.Collectors;

public class TakeWhileExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7);

        // Take numbers while they are less than 5
        List<Integer> result = numbers.stream()
                                      .takeWhile(n -> n < 5) // Stop as soon as an element >= 5
                                      .collect(Collectors.toList());

        System.out.println(result); // Output: [1, 2, 3, 4]
    }
}

Stream.dropWhile(predicate)

  • Operation: This method discards elements from the stream as long as the given predicate evaluates to true. Once the predicate evaluates to false, it will take the rest of the elements (even if they later match the predicate again).
  • Key Point: Opposite to takeWhile, it skips the matching elements first, and continues from where the condition becomes false.

Example:

package org.kodejava.util.stream;

import java.util.List;
import java.util.stream.Collectors;

public class DropWhileExample {
    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5, 6, 7);

        // Drop numbers while they are less than 5
        List<Integer> result = numbers.stream()
                                      .dropWhile(n -> n < 5) // Skip elements < 5; start when n >= 5
                                      .collect(Collectors.toList());

        System.out.println(result); // Output: [5, 6, 7]
    }
}

Differences Between takeWhile and dropWhile

Aspect takeWhile dropWhile
Purpose Takes elements until the predicate fails. Skips elements until the predicate fails.
Processing Stops At the first failure of the predicate. After the first failure of the predicate.
Returned Elements Elements satisfying the predicate, up to the first failure. Elements from the first failure onward.

Notes:

  1. Order-sensitive: These methods respect the order of the stream. If you use unordered streams, results might vary.
  2. Early stopping: takeWhile works efficiently because it short-circuits the moment the predicate fails.
  3. Infinite streams: Both can work with infinite streams but are best applied with a condition that eventually stops the operation.

Example with Infinite Stream:

package org.kodejava.util.stream;

import java.util.stream.Stream;
import java.util.List;
import java.util.stream.Collectors;

public class InfiniteStreamExample {
    public static void main(String[] args) {
        List<Integer> taken = Stream.iterate(1, n -> n + 1)
                                    .takeWhile(n -> n <= 5) // Stops when n > 5
                                    .collect(Collectors.toList());

        System.out.println(taken); // Output: [1, 2, 3, 4, 5]
    }
}

With these tools, you can write concise and declarative stream-processing logic.