Skip to content

Commit c9d5065

Browse files
authored
Bael 4466 error handling (eugenp#11269)
* Commit source code to branch * BAEL-5065 improvement of groupBy with complex key * BAEL-4466 Implementation of error handling
1 parent 6aabf32 commit c9d5065

10 files changed

Lines changed: 690 additions & 34 deletions

File tree

grpc/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@
3131
<artifactId>grpc-stub</artifactId>
3232
<version>${io.grpc.version}</version>
3333
</dependency>
34+
<dependency>
35+
<groupId>io.grpc</groupId>
36+
<artifactId>grpc-testing</artifactId>
37+
<version>${io.grpc.version}</version>
38+
<scope>test</scope>
39+
</dependency>
3440
<dependency>
3541
<groupId>junit</groupId>
3642
<artifactId>junit</artifactId>
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
package com.baeldung.grpc.errorhandling;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import com.baeldung.grpc.errorhandling.CommodityPriceProviderGrpc.CommodityPriceProviderStub;
10+
import com.google.protobuf.Any;
11+
import com.google.protobuf.InvalidProtocolBufferException;
12+
import com.google.rpc.Code;
13+
import com.google.rpc.ErrorInfo;
14+
15+
import io.grpc.Channel;
16+
import io.grpc.ManagedChannel;
17+
import io.grpc.ManagedChannelBuilder;
18+
import io.grpc.Status;
19+
import io.grpc.stub.StreamObserver;
20+
21+
public class CommodityClient {
22+
23+
private static final Logger logger = LoggerFactory.getLogger(CommodityClient.class.getName());
24+
25+
private final CommodityPriceProviderStub nonBlockingStub;
26+
27+
public CommodityClient(Channel channel) {
28+
29+
nonBlockingStub = CommodityPriceProviderGrpc.newStub(channel);
30+
}
31+
32+
33+
public void getBidirectionalCommodityPriceLists() throws InterruptedException {
34+
35+
logger.info("#######START EXAMPLE#######: BidirectionalStreaming - getCommodityPriceLists from list of commodities");
36+
final CountDownLatch finishLatch = new CountDownLatch(1);
37+
StreamObserver<StreamingCommodityQuote> responseObserver = new StreamObserver<StreamingCommodityQuote>() {
38+
@Override
39+
public void onNext(StreamingCommodityQuote streamingCommodityQuote) {
40+
41+
switch (streamingCommodityQuote.getMessageCase()) {
42+
case COMODITY_QUOTE:
43+
CommodityQuote commodityQuote = streamingCommodityQuote.getComodityQuote();
44+
logger.info("RESPONSE producer:" + commodityQuote.getCommodityName() + " price:" + commodityQuote.getPrice());
45+
break;
46+
case STATUS:
47+
com.google.rpc.Status status = streamingCommodityQuote.getStatus();
48+
logger.info("RESPONSE status error:");
49+
logger.info("Status code:" + Code.forNumber(status.getCode()));
50+
logger.info("Status message:" + status.getMessage());
51+
for (Any any : status.getDetailsList()) {
52+
if (any.is(ErrorInfo.class)) {
53+
ErrorInfo errorInfo;
54+
try {
55+
errorInfo = any.unpack(ErrorInfo.class);
56+
logger.info("Reason:" + errorInfo.getReason());
57+
logger.info("Domain:" + errorInfo.getDomain());
58+
logger.info("Insert Token:" + errorInfo.getMetadataMap().get("insertToken"));
59+
} catch (InvalidProtocolBufferException e) {
60+
logger.error(e.getMessage());
61+
}
62+
}
63+
}
64+
break;
65+
default:
66+
logger.info("Unknow message case");
67+
}
68+
}
69+
70+
@Override
71+
public void onCompleted() {
72+
logger.info("Finished getBidirectionalCommodityPriceListss");
73+
finishLatch.countDown();
74+
}
75+
76+
@Override
77+
public void onError(Throwable t) {
78+
logger.error("getBidirectionalCommodityPriceLists Failed:" + Status.fromThrowable(t));
79+
finishLatch.countDown();
80+
}
81+
};
82+
StreamObserver<Commodity> requestObserver = nonBlockingStub.bidirectionalListOfPrices(responseObserver);
83+
try {
84+
for (int i = 1; i <= 2; i++) {
85+
Commodity request = Commodity.newBuilder()
86+
.setCommodityName("Commodity" + i)
87+
.setAccessToken(i + "23validToken")
88+
.build();
89+
logger.info("REQUEST - commodity:" + request.getCommodityName());
90+
requestObserver.onNext(request);
91+
Thread.sleep(200);
92+
if (finishLatch.getCount() == 0) {
93+
return;
94+
}
95+
}
96+
} catch (RuntimeException e) {
97+
requestObserver.onError(e);
98+
throw e;
99+
}
100+
requestObserver.onCompleted();
101+
102+
if (!finishLatch.await(1, TimeUnit.MINUTES)) {
103+
logger.info("getBidirectionalCommodityPriceLists can not finish within 1 minute");
104+
}
105+
}
106+
107+
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
108+
109+
String target = "localhost:8980";
110+
if (args.length > 0) {
111+
target = args[0];
112+
}
113+
114+
ManagedChannel channel = ManagedChannelBuilder.forTarget(target)
115+
.usePlaintext()
116+
.build();
117+
try {
118+
CommodityClient client = new CommodityClient(channel);
119+
120+
client.getBidirectionalCommodityPriceLists();
121+
122+
} finally {
123+
channel.shutdownNow()
124+
.awaitTermination(5, TimeUnit.SECONDS);
125+
}
126+
}
127+
}
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
package com.baeldung.grpc.errorhandling;
2+
3+
import java.io.IOException;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ThreadLocalRandom;
7+
import java.util.concurrent.TimeUnit;
8+
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
import com.google.protobuf.Any;
13+
import com.google.rpc.Code;
14+
import com.google.rpc.ErrorInfo;
15+
16+
import io.grpc.Metadata;
17+
import io.grpc.Server;
18+
import io.grpc.ServerBuilder;
19+
import io.grpc.Status;
20+
import io.grpc.protobuf.ProtoUtils;
21+
import io.grpc.protobuf.StatusProto;
22+
import io.grpc.stub.StreamObserver;
23+
24+
public class CommodityServer {
25+
26+
private static final Logger logger = LoggerFactory.getLogger(CommodityServer.class.getName());
27+
private final int port;
28+
private final Server server;
29+
private static Map<String, Double> commodityLookupBasePrice;
30+
static {
31+
commodityLookupBasePrice = new ConcurrentHashMap<>();
32+
commodityLookupBasePrice.put("Commodity1", 5.0);
33+
commodityLookupBasePrice.put("Commodity2", 6.0);
34+
}
35+
36+
public static void main(String[] args) throws Exception {
37+
38+
CommodityServer commodityServer = new CommodityServer(8980);
39+
commodityServer.start();
40+
if (commodityServer.server != null) {
41+
commodityServer.server.awaitTermination();
42+
}
43+
}
44+
45+
public CommodityServer(int port) throws IOException {
46+
this.port = port;
47+
server = ServerBuilder.forPort(port)
48+
.addService(new CommodityService())
49+
.build();
50+
}
51+
52+
public void start() throws IOException {
53+
server.start();
54+
logger.info("Server started, listening on {}", port);
55+
Runtime.getRuntime()
56+
.addShutdownHook(new Thread() {
57+
@Override
58+
public void run() {
59+
System.err.println("shutting down server");
60+
try {
61+
CommodityServer.this.stop();
62+
} catch (InterruptedException e) {
63+
e.printStackTrace(System.err);
64+
}
65+
System.err.println("server shutted down");
66+
}
67+
});
68+
}
69+
70+
public void stop() throws InterruptedException {
71+
if (server != null) {
72+
server.shutdown()
73+
.awaitTermination(30, TimeUnit.SECONDS);
74+
}
75+
}
76+
77+
public static class CommodityService extends CommodityPriceProviderGrpc.CommodityPriceProviderImplBase {
78+
79+
@Override
80+
public void getBestCommodityPrice(Commodity request, StreamObserver<CommodityQuote> responseObserver) {
81+
82+
if (commodityLookupBasePrice.get(request.getCommodityName()) == null) {
83+
84+
Metadata.Key<ErrorResponse> errorResponseKey = ProtoUtils.keyForProto(ErrorResponse.getDefaultInstance());
85+
ErrorResponse errorResponse = ErrorResponse.newBuilder()
86+
.setCommodityName(request.getCommodityName())
87+
.setAccessToken(request.getAccessToken())
88+
.setExpectedValue("Only Commodity1, Commodity2 are supported")
89+
.build();
90+
Metadata metadata = new Metadata();
91+
metadata.put(errorResponseKey, errorResponse);
92+
responseObserver.onError(Status.INVALID_ARGUMENT.withDescription("The commodity is not supported")
93+
.asRuntimeException(metadata));
94+
} else if (request.getAccessToken().equals("123validToken") == false) {
95+
96+
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
97+
.setCode(Code.NOT_FOUND.getNumber())
98+
.setMessage("The access token not found")
99+
.addDetails(Any.pack(ErrorInfo.newBuilder()
100+
.setReason("Invalid Token")
101+
.setDomain("com.baeldung.grpc.errorhandling")
102+
.putMetadata("insertToken", "123validToken")
103+
.build()))
104+
.build();
105+
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
106+
} else {
107+
CommodityQuote commodityQuote = CommodityQuote.newBuilder()
108+
.setPrice(fetchBestPriceBid(request))
109+
.setCommodityName(request.getCommodityName())
110+
.setProducerName("Best Producer with best price")
111+
.build();
112+
responseObserver.onNext(commodityQuote);
113+
responseObserver.onCompleted();
114+
}
115+
}
116+
117+
@Override
118+
public StreamObserver<Commodity> bidirectionalListOfPrices(StreamObserver<StreamingCommodityQuote> responseObserver) {
119+
120+
return new StreamObserver<Commodity>() {
121+
@Override
122+
public void onNext(Commodity request) {
123+
124+
logger.info("Access token:{}", request.getAccessToken());
125+
if (request.getAccessToken()
126+
.equals("123validToken") == false) {
127+
128+
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
129+
.setCode(Code.NOT_FOUND.getNumber())
130+
.setMessage("The access token not found")
131+
.addDetails(Any.pack(ErrorInfo.newBuilder()
132+
.setReason("Invalid Token")
133+
.setDomain("com.baeldung.grpc.errorhandling")
134+
.putMetadata("insertToken", "123validToken")
135+
.build()))
136+
.build();
137+
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
138+
.setStatus(status)
139+
.build();
140+
responseObserver.onNext(streamingCommodityQuote);
141+
} else {
142+
143+
for (int i = 1; i <= 5; i++) {
144+
CommodityQuote commodityQuote = CommodityQuote.newBuilder()
145+
.setPrice(fetchProviderPriceBid(request, "producer:" + i))
146+
.setCommodityName(request.getCommodityName())
147+
.setProducerName("producer:" + i)
148+
.build();
149+
StreamingCommodityQuote streamingCommodityQuote = StreamingCommodityQuote.newBuilder()
150+
.setComodityQuote(commodityQuote)
151+
.build();
152+
responseObserver.onNext(streamingCommodityQuote);
153+
}
154+
}
155+
}
156+
157+
@Override
158+
public void onCompleted() {
159+
responseObserver.onCompleted();
160+
}
161+
162+
@Override
163+
public void onError(Throwable t) {
164+
logger.info("error:{}", t.getMessage());
165+
}
166+
};
167+
}
168+
169+
}
170+
171+
private static double fetchBestPriceBid(Commodity commodity) {
172+
173+
return commodityLookupBasePrice.get(commodity.getCommodityName()) + ThreadLocalRandom.current()
174+
.nextDouble(-0.2d, 0.2d);
175+
}
176+
177+
private static double fetchProviderPriceBid(Commodity commodity, String providerName) {
178+
179+
return commodityLookupBasePrice.get(commodity.getCommodityName()) + providerName.length() + ThreadLocalRandom.current()
180+
.nextDouble(-0.2d, 0.2d);
181+
}
182+
}

0 commit comments

Comments
 (0)