Skip to content

Commit e505326

Browse files
authored
Bael 5119 streaming in g rpc (eugenp#11215)
* Commit source code to branch * BAEL-5065 improvement of groupBy with complex key * Streaming in gRPC
1 parent 2cfbd8d commit e505326

4 files changed

Lines changed: 437 additions & 75 deletions

File tree

grpc/pom.xml

Lines changed: 76 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,81 +1,82 @@
11
<?xml version="1.0" encoding="UTF-8"?>
22
<project xmlns="http://maven.apache.org/POM/4.0.0"
3-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5-
<modelVersion>4.0.0</modelVersion>
6-
<artifactId>grpc</artifactId>
7-
<version>0.0.1-SNAPSHOT</version>
8-
<name>grpc</name>
9-
<packaging>jar</packaging>
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
<artifactId>grpc</artifactId>
7+
<version>0.0.1-SNAPSHOT</version>
8+
<name>grpc</name>
9+
<packaging>jar</packaging>
1010

11-
<parent>
12-
<groupId>com.baeldung</groupId>
13-
<artifactId>parent-modules</artifactId>
14-
<version>1.0.0-SNAPSHOT</version>
15-
</parent>
11+
<parent>
12+
<groupId>com.baeldung</groupId>
13+
<artifactId>parent-modules</artifactId>
14+
<version>1.0.0-SNAPSHOT</version>
15+
</parent>
1616

17-
<dependencies>
18-
<dependency>
19-
<groupId>io.grpc</groupId>
20-
<artifactId>grpc-netty</artifactId>
21-
<version>${io.grpc.version}</version>
22-
</dependency>
23-
<dependency>
24-
<groupId>io.grpc</groupId>
25-
<artifactId>grpc-protobuf</artifactId>
26-
<version>${io.grpc.version}</version>
27-
</dependency>
28-
<dependency>
29-
<groupId>io.grpc</groupId>
30-
<artifactId>grpc-stub</artifactId>
31-
<version>${io.grpc.version}</version>
32-
</dependency>
33-
<dependency>
34-
<groupId>junit</groupId>
35-
<artifactId>junit</artifactId>
36-
<version>${junit.version}</version>
37-
<scope>test</scope>
38-
</dependency>
39-
</dependencies>
40-
41-
<build>
42-
<extensions>
43-
<extension>
44-
<groupId>kr.motd.maven</groupId>
45-
<artifactId>os-maven-plugin</artifactId>
46-
<version>${os-maven-plugin.version}</version>
47-
</extension>
48-
</extensions>
49-
<plugins>
50-
<plugin>
51-
<groupId>org.xolstice.maven.plugins</groupId>
52-
<artifactId>protobuf-maven-plugin</artifactId>
53-
<version>${protobuf-maven-plugin.version}</version>
54-
<configuration>
55-
<protocArtifact>
56-
com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
57-
</protocArtifact>
58-
<pluginId>grpc-java</pluginId>
59-
<pluginArtifact>
60-
io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
61-
</pluginArtifact>
62-
</configuration>
63-
<executions>
64-
<execution>
65-
<goals>
66-
<goal>compile</goal>
67-
<goal>compile-custom</goal>
68-
</goals>
69-
</execution>
70-
</executions>
71-
</plugin>
72-
</plugins>
73-
</build>
74-
75-
<properties>
76-
<io.grpc.version>1.16.1</io.grpc.version>
77-
<os-maven-plugin.version>1.6.1</os-maven-plugin.version>
78-
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
79-
</properties>
17+
<dependencies>
18+
<dependency>
19+
<groupId>io.grpc</groupId>
20+
<artifactId>grpc-netty-shaded</artifactId>
21+
<scope>runtime</scope>
22+
<version>${io.grpc.version}</version>
23+
</dependency>
24+
<dependency>
25+
<groupId>io.grpc</groupId>
26+
<artifactId>grpc-protobuf</artifactId>
27+
<version>${io.grpc.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.grpc</groupId>
31+
<artifactId>grpc-stub</artifactId>
32+
<version>${io.grpc.version}</version>
33+
</dependency>
34+
<dependency>
35+
<groupId>junit</groupId>
36+
<artifactId>junit</artifactId>
37+
<version>${junit.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>javax.annotation</groupId>
42+
<artifactId>javax.annotation-api</artifactId>
43+
<version>1.2</version>
44+
</dependency>
45+
</dependencies>
8046

47+
<build>
48+
<extensions>
49+
<extension>
50+
<groupId>kr.motd.maven</groupId>
51+
<artifactId>os-maven-plugin</artifactId>
52+
<version>${os-maven-plugin.version}</version>
53+
</extension>
54+
</extensions>
55+
<plugins>
56+
<plugin>
57+
<groupId>org.xolstice.maven.plugins</groupId>
58+
<artifactId>protobuf-maven-plugin</artifactId>
59+
<version>${protobuf-maven-plugin.version}</version>
60+
<configuration>
61+
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
62+
<pluginId>grpc-java</pluginId>
63+
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${io.grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
64+
</configuration>
65+
<executions>
66+
<execution>
67+
<goals>
68+
<goal>compile</goal>
69+
<goal>compile-custom</goal>
70+
</goals>
71+
</execution>
72+
</executions>
73+
</plugin>
74+
</plugins>
75+
</build>
76+
<properties>
77+
<io.grpc.version>1.40.1</io.grpc.version>
78+
<protoc.version>3.17.2</protoc.version>
79+
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
80+
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
81+
</properties>
8182
</project>
Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,183 @@
1+
package com.baeldung.grpc.streaming;
2+
3+
import java.util.Arrays;
4+
import java.util.Iterator;
5+
import java.util.List;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.logging.Level;
9+
import java.util.logging.Logger;
10+
11+
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderBlockingStub;
12+
import com.baeldung.grpc.streaming.StockQuoteProviderGrpc.StockQuoteProviderStub;
13+
14+
import io.grpc.Channel;
15+
import io.grpc.ManagedChannel;
16+
import io.grpc.ManagedChannelBuilder;
17+
import io.grpc.Status;
18+
import io.grpc.StatusRuntimeException;
19+
import io.grpc.stub.StreamObserver;
20+
21+
public class StockClient {
22+
private static final Logger logger = Logger.getLogger(StockClient.class.getName());
23+
24+
private final StockQuoteProviderBlockingStub blockingStub;
25+
private final StockQuoteProviderStub nonBlockingStub;
26+
private List<Stock> stocks;
27+
28+
public StockClient(Channel channel) {
29+
30+
blockingStub = StockQuoteProviderGrpc.newBlockingStub(channel);
31+
nonBlockingStub = StockQuoteProviderGrpc.newStub(channel);
32+
initializeStocks();
33+
}
34+
35+
public void serverSideStreamingListOfStockPrices() {
36+
37+
logInfo("######START EXAMPLE######: ServerSideStreaming - list of Stock prices from a given stock");
38+
Stock request = Stock.newBuilder()
39+
.setTickerSymbol("AU")
40+
.setCompanyName("Austich")
41+
.setDescription("server streaming example")
42+
.build();
43+
Iterator<StockQuote> stockQuotes;
44+
try {
45+
logInfo("REQUEST - ticker symbol {0}", request.getTickerSymbol());
46+
stockQuotes = blockingStub.serverSideStreamingGetListStockQuotes(request);
47+
for (int i = 1; stockQuotes.hasNext(); i++) {
48+
StockQuote stockQuote = stockQuotes.next();
49+
logInfo("RESPONSE - Price #" + i + ": {0}", stockQuote.getPrice());
50+
}
51+
} catch (StatusRuntimeException e) {
52+
logInfo("RPC failed: {0}", e.getStatus());
53+
}
54+
}
55+
56+
public void clientSideStreamingGetStatisticsOfStocks() throws InterruptedException {
57+
58+
logInfo("######START EXAMPLE######: ClientSideStreaming - getStatisticsOfStocks from a list of stocks");
59+
final CountDownLatch finishLatch = new CountDownLatch(1);
60+
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
61+
@Override
62+
public void onNext(StockQuote summary) {
63+
logInfo("RESPONSE, got stock statistics - Average Price: {0}, description: {1}", summary.getPrice(), summary.getDescription());
64+
}
65+
66+
@Override
67+
public void onCompleted() {
68+
logInfo("Finished clientSideStreamingGetStatisticsOfStocks");
69+
finishLatch.countDown();
70+
}
71+
72+
@Override
73+
public void onError(Throwable t) {
74+
logWarning("Stock Statistics Failed: {0}", Status.fromThrowable(t));
75+
finishLatch.countDown();
76+
}
77+
};
78+
79+
StreamObserver<Stock> requestObserver = nonBlockingStub.clientSideStreamingGetStatisticsOfStocks(responseObserver);
80+
try {
81+
82+
for (Stock stock : stocks) {
83+
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
84+
requestObserver.onNext(stock);
85+
if (finishLatch.getCount() == 0) {
86+
return;
87+
}
88+
}
89+
} catch (RuntimeException e) {
90+
requestObserver.onError(e);
91+
throw e;
92+
}
93+
requestObserver.onCompleted();
94+
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
95+
logWarning("clientSideStreamingGetStatisticsOfStocks can not finish within 1 minutes");
96+
}
97+
}
98+
99+
public void bidirectionalStreamingGetListsStockQuotes() throws InterruptedException{
100+
101+
logInfo("#######START EXAMPLE#######: BidirectionalStreaming - getListsStockQuotes from list of stocks");
102+
final CountDownLatch finishLatch = new CountDownLatch(1);
103+
StreamObserver<StockQuote> responseObserver = new StreamObserver<StockQuote>() {
104+
@Override
105+
public void onNext(StockQuote stockQuote) {
106+
logInfo("RESPONSE price#{0} : {1}, description:{2}", stockQuote.getOfferNumber(), stockQuote.getPrice(), stockQuote.getDescription());
107+
}
108+
109+
@Override
110+
public void onCompleted() {
111+
logInfo("Finished bidirectionalStreamingGetListsStockQuotes");
112+
finishLatch.countDown();
113+
}
114+
115+
@Override
116+
public void onError(Throwable t) {
117+
logWarning("bidirectionalStreamingGetListsStockQuotes Failed: {0}", Status.fromThrowable(t));
118+
finishLatch.countDown();
119+
}
120+
};
121+
StreamObserver<Stock> requestObserver = nonBlockingStub.bidirectionalStreamingGetListsStockQuotes(responseObserver);
122+
try {
123+
for (Stock stock : stocks) {
124+
logInfo("REQUEST: {0}, {1}", stock.getTickerSymbol(), stock.getCompanyName());
125+
requestObserver.onNext(stock);
126+
Thread.sleep(200);
127+
if (finishLatch.getCount() == 0) {
128+
return;
129+
}
130+
}
131+
} catch (RuntimeException e) {
132+
requestObserver.onError(e);
133+
throw e;
134+
}
135+
requestObserver.onCompleted();
136+
137+
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
138+
logWarning("bidirectionalStreamingGetListsStockQuotes can not finish within 1 minute");
139+
}
140+
141+
}
142+
143+
public static void main(String[] args) throws InterruptedException {
144+
String target = "localhost:8980";
145+
if (args.length > 0) {
146+
target = args[0];
147+
}
148+
149+
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
150+
.usePlaintext()
151+
.build();
152+
try {
153+
StockClient client = new StockClient(channel);
154+
155+
client.serverSideStreamingListOfStockPrices();
156+
157+
client.clientSideStreamingGetStatisticsOfStocks();
158+
159+
client.bidirectionalStreamingGetListsStockQuotes();
160+
161+
} finally {
162+
channel.shutdownNow()
163+
.awaitTermination(5, TimeUnit.SECONDS);
164+
}
165+
}
166+
167+
private void initializeStocks() {
168+
169+
this.stocks = Arrays.asList(Stock.newBuilder().setTickerSymbol("AU").setCompanyName("Auburn Corp").setDescription("Aptitude Intel").build()
170+
, Stock.newBuilder().setTickerSymbol("BAS").setCompanyName("Bassel Corp").setDescription("Business Intel").build()
171+
, Stock.newBuilder().setTickerSymbol("COR").setCompanyName("Corvine Corp").setDescription("Corporate Intel").build()
172+
, Stock.newBuilder().setTickerSymbol("DIA").setCompanyName("Dialogic Corp").setDescription("Development Intel").build()
173+
, Stock.newBuilder().setTickerSymbol("EUS").setCompanyName("Euskaltel Corp").setDescription("English Intel").build());
174+
}
175+
176+
private void logInfo(String msg, Object... params) {
177+
logger.log(Level.INFO, msg, params);
178+
}
179+
180+
private void logWarning(String msg, Object... params) {
181+
logger.log(Level.WARNING, msg, params);
182+
}
183+
}

0 commit comments

Comments
 (0)