Skip to content

Commit 9e7c4c9

Browse files
committed
提交第三周作业
1 parent 6943786 commit 9e7c4c9

8 files changed

Lines changed: 221 additions & 12 deletions

File tree

02nio/nio02/README.md

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
# netty-gateway
22

3-
```
4-
3+
第一题答案:
4+
OkhttpOutboundHandler
5+
6+
第三题答案:
7+
TokenBucketHttpRequestFilter
8+
9+
第四题答案:
10+
RandHttpEndPointRouter
511

6-
```

02nio/nio02/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,18 @@
5757
<groupId>org.projectlombok</groupId>
5858
<artifactId>lombok</artifactId>
5959
</dependency>
60+
61+
<dependency>
62+
<groupId>com.squareup.okhttp3</groupId>
63+
<artifactId>okhttp</artifactId>
64+
<version>3.12.0</version>
65+
</dependency>
66+
67+
<dependency>
68+
<groupId>com.google.guava</groupId>
69+
<artifactId>guava</artifactId>
70+
<version>21.0</version>
71+
</dependency>
6072

6173
<!--
6274
<dependency>

02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HeaderHttpRequestFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,9 @@ public class HeaderHttpRequestFilter implements HttpRequestFilter {
99
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
1010
fullRequest.headers().set("mao", "soul");
1111
}
12+
13+
@Override
14+
public boolean filter2(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
15+
return false;
16+
}
1217
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/filter/HttpRequestFilter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@
66
public interface HttpRequestFilter {
77

88
void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx);
9+
10+
boolean filter2(FullHttpRequest fullRequest, ChannelHandlerContext ctx);
911

1012
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package io.github.kimmking.gateway.filter;
2+
3+
import com.google.common.util.concurrent.RateLimiter;
4+
import io.netty.channel.ChannelHandlerContext;
5+
import io.netty.handler.codec.http.FullHttpRequest;
6+
7+
/**
8+
* 使用guava令牌桶算法限流
9+
*
10+
* @author lizhe
11+
*/
12+
public class TokenBucketHttpRequestFilter implements HttpRequestFilter {
13+
14+
private static RateLimiter rateLimiter = RateLimiter.create(1.0);
15+
16+
@Override
17+
public void filter(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
18+
19+
}
20+
21+
@Override
22+
public boolean filter2(FullHttpRequest fullRequest, ChannelHandlerContext ctx) {
23+
return rateLimiter.tryAcquire();
24+
}
25+
}

02nio/nio02/src/main/java/io/github/kimmking/gateway/inbound/HttpInboundHandler.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
import io.github.kimmking.gateway.filter.HeaderHttpRequestFilter;
44
import io.github.kimmking.gateway.filter.HttpRequestFilter;
5-
import io.github.kimmking.gateway.outbound.httpclient4.HttpOutboundHandler;
5+
import io.github.kimmking.gateway.filter.TokenBucketHttpRequestFilter;
6+
import io.github.kimmking.gateway.outbound.okhttp.OkhttpOutboundHandler;
67
import io.netty.channel.ChannelHandlerContext;
78
import io.netty.channel.ChannelInboundHandlerAdapter;
89
import io.netty.handler.codec.http.FullHttpRequest;
@@ -16,14 +17,15 @@ public class HttpInboundHandler extends ChannelInboundHandlerAdapter {
1617

1718
private static Logger logger = LoggerFactory.getLogger(HttpInboundHandler.class);
1819
private final List<String> proxyServer;
19-
private HttpOutboundHandler handler;
20-
private HttpRequestFilter filter = new HeaderHttpRequestFilter();
21-
20+
private OkhttpOutboundHandler handler;
21+
private HttpRequestFilter filter = new TokenBucketHttpRequestFilter();
22+
2223
public HttpInboundHandler(List<String> proxyServer) {
2324
this.proxyServer = proxyServer;
24-
this.handler = new HttpOutboundHandler(this.proxyServer);
25+
// this.handler = new HttpOutboundHandler(this.proxyServer);
26+
this.handler = new OkhttpOutboundHandler(this.proxyServer);
2527
}
26-
28+
2729
@Override
2830
public void channelReadComplete(ChannelHandlerContext ctx) {
2931
ctx.flush();
@@ -39,10 +41,10 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
3941
// if (uri.contains("/test")) {
4042
// handlerTest(fullRequest, ctx);
4143
// }
42-
44+
4345
handler.handle(fullRequest, ctx, filter);
44-
45-
} catch(Exception e) {
46+
47+
} catch (Exception e) {
4648
e.printStackTrace();
4749
} finally {
4850
ReferenceCountUtil.release(msg);
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,132 @@
11
package io.github.kimmking.gateway.outbound.okhttp;
22

3+
import io.github.kimmking.gateway.filter.HttpRequestFilter;
4+
import io.github.kimmking.gateway.outbound.httpclient4.NamedThreadFactory;
5+
import io.github.kimmking.gateway.router.HttpEndpointRouter;
6+
import io.github.kimmking.gateway.router.RandHttpEndPointRouter;
7+
import io.netty.buffer.Unpooled;
8+
import io.netty.channel.ChannelFutureListener;
9+
import io.netty.channel.ChannelHandlerContext;
10+
import io.netty.handler.codec.http.DefaultFullHttpResponse;
11+
import io.netty.handler.codec.http.FullHttpRequest;
12+
import io.netty.handler.codec.http.FullHttpResponse;
13+
import io.netty.handler.codec.http.HttpUtil;
14+
import okhttp3.*;
15+
16+
import java.io.IOException;
17+
import java.util.List;
18+
import java.util.concurrent.*;
19+
20+
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
21+
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
22+
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
23+
324
public class OkhttpOutboundHandler {
25+
26+
private OkHttpClient client;
27+
private ExecutorService proxyService;
28+
private List<String> backendUrls;
29+
private HttpEndpointRouter router;
30+
31+
public OkhttpOutboundHandler(List<String> backendUrls) {
32+
this.backendUrls = backendUrls;
33+
34+
ConnectionPool pool = new ConnectionPool(5, 10, TimeUnit.MINUTES);
35+
client = new OkHttpClient().newBuilder().readTimeout(2000, TimeUnit.MILLISECONDS)
36+
.connectionPool(pool).retryOnConnectionFailure(false).followRedirects(true).build();
37+
38+
int cores = Runtime.getRuntime().availableProcessors() * 2;
39+
long keepAliveTime = 1000;
40+
int queueSize = 2048;
41+
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
42+
proxyService = new ThreadPoolExecutor(cores, cores,
43+
keepAliveTime, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(queueSize),
44+
new NamedThreadFactory("proxyService"), handler);
45+
46+
router = new RandHttpEndPointRouter();
47+
}
48+
49+
private String formatUrl(String backend) {
50+
return backend.endsWith("/") ? backend.substring(0, backend.length() - 1) : backend;
51+
}
52+
53+
54+
public void handle(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, HttpRequestFilter filter)
55+
throws Exception {
56+
57+
String backendUrl = router.route(this.backendUrls);
58+
final String url = backendUrl + fullRequest.uri();
59+
60+
if (filter.filter2(fullRequest, ctx)) {
61+
62+
proxyService.submit(() -> fetchGet(fullRequest, ctx, url));
63+
} else {
64+
limited(ctx);
65+
}
66+
}
67+
68+
private void fetchGet(final FullHttpRequest inbound, final ChannelHandlerContext ctx, final String url) {
69+
try {
70+
71+
final Request request = new Request.Builder().url(url).get().build();
72+
final Call call = client.newCall(request);
73+
call.enqueue(new Callback() {
74+
@Override
75+
public void onFailure(Call call, IOException e) {
76+
e.printStackTrace();
77+
}
78+
79+
@Override
80+
public void onResponse(Call call, Response response) throws IOException {
81+
try {
82+
handleResponse(inbound, ctx, response);
83+
} catch (Exception e) {
84+
e.printStackTrace();
85+
}
86+
}
87+
});
88+
}catch (Exception e){
89+
e.printStackTrace();
90+
}
91+
}
92+
93+
private void limited(final ChannelHandlerContext ctx) {
94+
byte[] bytes = "被限流...".getBytes();
95+
FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(bytes));
96+
97+
response.headers().set("Content-Type", "application/json");
98+
response.headers().setInt("Content-Length", bytes.length);
99+
100+
ctx.write(response);
101+
ctx.flush();
102+
}
103+
104+
private void handleResponse(final FullHttpRequest fullRequest, final ChannelHandlerContext ctx, final Response endpointResponse) throws Exception {
105+
FullHttpResponse response = null;
106+
try {
107+
String body = endpointResponse.body().string();
108+
System.out.println(body);
109+
byte[] bodys = body.getBytes();
110+
111+
response = new DefaultFullHttpResponse(HTTP_1_1, OK, Unpooled.wrappedBuffer(bodys));
112+
113+
response.headers().set("Content-Type", "application/json");
114+
response.headers().setInt("Content-Length", Integer.parseInt(endpointResponse.header("Content-Length")));
115+
116+
117+
} catch (Exception e) {
118+
e.printStackTrace();
119+
response = new DefaultFullHttpResponse(HTTP_1_1, NO_CONTENT);
120+
ctx.close();
121+
} finally {
122+
if (fullRequest != null) {
123+
if (!HttpUtil.isKeepAlive(fullRequest)) {
124+
ctx.write(response).addListener(ChannelFutureListener.CLOSE);
125+
} else {
126+
ctx.write(response);
127+
}
128+
}
129+
ctx.flush();
130+
}
131+
}
4132
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.github.kimmking.gateway.router;
2+
3+
import java.util.List;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
/**
7+
* @author lizhe
8+
*/
9+
public class RandHttpEndPointRouter implements HttpEndpointRouter {
10+
11+
private AtomicInteger atomicInteger = new AtomicInteger(0);
12+
13+
14+
@Override
15+
public String route(List<String> endpoints) {
16+
17+
final int modulo = incrementAndGetModulo();
18+
return endpoints.get(modulo % endpoints.size());
19+
}
20+
21+
public final int incrementAndGetModulo() {
22+
int current;
23+
int next;
24+
do {
25+
current = this.atomicInteger.get();
26+
next = current >= Integer.MAX_VALUE ? 0 : current + 1;
27+
} while (!this.atomicInteger.compareAndSet(current, next));
28+
return next;
29+
}
30+
}

0 commit comments

Comments
 (0)