The java.util.concurrent.Flow API, introduced in Java 9, is a low-level implementation of the Reactive Streams specification, providing an asynchronous, non-blocking framework for handling streams of data. It allows you to build reactive systems that support backpressure to handle large or variable amounts of data more efficiently.
Here’s a quick guide on how to use java.util.concurrent.Flow API:
Key Interfaces in java.util.concurrent.Flow
The API comprises four core interfaces:
Flow.Publisher:
Represents the producer of data. It publishes items to one or more subscribers.-
Flow.Subscriber:
Represents the consumer of data. It subscribes to a publisher to receive data. -
Flow.Subscription:
Represents a link between a publisher and a subscriber, allowing the subscriber to control how much data it receives (backpressure). -
Flow.Processor:
Both a subscriber and a publisher, used to transform or process elements as they flow through the stream.
Implementation Workflow
To use the Flow API, you need to implement these interfaces. Below is a step-by-step explanation:
1. Create a Publisher
- The
Flow.Publisherinterface has a single methodsubscribe(Subscriber<? super T> subscriber). - The publisher is responsible for connecting with subscribers and managing their subscriptions.
package org.kodejava.util.concurrent;
import java.util.concurrent.Flow;
public class SimplePublisher implements Flow.Publisher<String> {
private final String[] items = {"Item 1", "Item 2", "Item 3"};
@Override
public void subscribe(Flow.Subscriber<? super String> subscriber) {
SubscriptionImpl subscription = new SubscriptionImpl(subscriber, items);
subscriber.onSubscribe(subscription);
}
private static class SubscriptionImpl implements Flow.Subscription {
private final Flow.Subscriber<? super String> subscriber;
private final String[] items;
private int currentIndex = 0;
private boolean canceled = false;
public SubscriptionImpl(Flow.Subscriber<? super String> subscriber, String[] items) {
this.subscriber = subscriber;
this.items = items;
}
@Override
public void request(long n) {
if (n <= 0) {
subscriber.onError(new IllegalArgumentException("Must request a positive number of items"));
return;
}
for (int i = 0; i < n && currentIndex < items.length; i++) {
if (canceled) {
return;
}
subscriber.onNext(items[currentIndex++]);
}
if (currentIndex == items.length) {
subscriber.onComplete();
}
}
@Override
public void cancel() {
canceled = true;
}
}
}
2. Create a Subscriber
- Implement the
Flow.Subscriberinterface (four methods) for receiving events from a publisher:onSubscribe(Flow.Subscription subscription): Receive the subscription. You must request data here.onNext(T item): Handle the next item of the stream.onError(Throwable throwable): Handle any errors.onComplete(): Called when the publisher finishes sending data.
package org.kodejava.util.concurrent;
import java.util.concurrent.Flow;
public class SimpleSubscriber implements Flow.Subscriber<String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscribed!");
subscription.request(1); // Request the first item
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
subscription.request(1); // Request the next item
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
@Override
public void onComplete() {
System.out.println("Complete!");
}
}
3. Connect the Publisher to the Subscriber
- Instantiate and link your
PublisherandSubscriber.
package org.kodejava.util.concurrent;
public class FlowExample {
public static void main(String[] args) {
SimplePublisher publisher = new SimplePublisher();
SimpleSubscriber subscriber = new SimpleSubscriber();
publisher.subscribe(subscriber);
}
}
4. (Optional) Create a Processor
- A
Processoracts as both aSubscriberto transform data from an upstream publisher and aPublisherto pass it downstream.
package org.kodejava.util.concurrent;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class UppercaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String item) {
submit(item.toUpperCase());
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
close();
}
@Override
public void onComplete() {
System.out.println("Processing complete!");
close();
}
}
Example with SubmissionPublisher
Java also provides a SubmissionPublisher class, an implementation of Flow.Publisher, which simplifies creating Publishers.
package org.kodejava.util.concurrent;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
public static void main(String[] args) throws InterruptedException {
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscribed!");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Received: " + item);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done!");
}
};
publisher.subscribe(subscriber);
System.out.println("Publishing data...");
publisher.submit("Hello");
publisher.submit("World");
publisher.submit("!");
Thread.sleep(100); // Allow time for processing
publisher.close();
}
}
Output of the Above Example
Publishing data...
Subscribed!
Received: Hello
Received: World
Received: !
Done!
Points to Remember
- Backpressure:
- The subscriber can control how many items it wants to receive using the
request()method ofFlow.Subscription. - If the subscriber requests fewer items, the publisher will slow down and send only the requested number.
- The subscriber can control how many items it wants to receive using the
- Error Handling:
- If something goes wrong, the
onError()callback is invoked, allowing you to handle errors gracefully.
- If something goes wrong, the
- Completion:
- Once all elements are processed, the publisher calls
onComplete()to indicate the sequence is finished.
- Once all elements are processed, the publisher calls
- Threading:
- The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the
SubmissionPublisheruses a default ForkJoinPool to process items).
- The Flow API itself doesn’t mandate the use of specific threads for dealing with publishers/subscribers, but it’s often paired with asynchronous mechanisms (e.g., the
By combining custom implementations with the provided SubmissionPublisher and additional libraries, you can build reactive systems that are both powerful and resource-efficient.
