11package io .github .kimmking .gateway .outbound .httpclient4 ;
22
33
4+ import io .github .kimmking .gateway .filter .HttpRequestFilter ;
5+ import io .github .kimmking .gateway .router .HttpEndpointRouter ;
6+ import io .github .kimmking .gateway .router .RandomHttpEndpointRouter ;
47import io .netty .buffer .Unpooled ;
58import io .netty .channel .ChannelFutureListener ;
69import io .netty .channel .ChannelHandlerContext ;
1720import org .apache .http .protocol .HTTP ;
1821import org .apache .http .util .EntityUtils ;
1922
23+ import java .util .List ;
24+ import java .util .Random ;
2025import java .util .concurrent .*;
26+ import java .util .logging .Filter ;
27+ import java .util .stream .Collectors ;
2128
2229import static io .netty .handler .codec .http .HttpResponseStatus .NO_CONTENT ;
2330import static io .netty .handler .codec .http .HttpResponseStatus .OK ;
@@ -27,10 +34,14 @@ public class HttpOutboundHandler {
2734
2835 private CloseableHttpAsyncClient httpclient ;
2936 private ExecutorService proxyService ;
30- private String backendUrl ;
31-
32- public HttpOutboundHandler (String backendUrl ){
33- this .backendUrl = backendUrl .endsWith ("/" )?backendUrl .substring (0 ,backendUrl .length ()-1 ):backendUrl ;
37+ private List <String > backendUrls ;
38+
39+ HttpEndpointRouter router = new RandomHttpEndpointRouter ();
40+
41+ public HttpOutboundHandler (List <String > backends ) {
42+
43+ this .backendUrls = backendUrls .stream ().map (this ::formatUrl ).collect (Collectors .toList ());
44+
3445 int cores = Runtime .getRuntime ().availableProcessors () * 2 ;
3546 long keepAliveTime = 1000 ;
3647 int queueSize = 2048 ;
@@ -53,16 +64,24 @@ public HttpOutboundHandler(String backendUrl){
5364 .build ();
5465 httpclient .start ();
5566 }
67+
68+ private String formatUrl (String backend ) {
69+ return backend .endsWith ("/" )?backend .substring (0 ,backend .length ()-1 ):backend ;
70+ }
5671
57- public void handle (final FullHttpRequest fullRequest , final ChannelHandlerContext ctx ) {
58- final String url = this .backendUrl + fullRequest .uri ();
72+ public void handle (final FullHttpRequest fullRequest , final ChannelHandlerContext ctx , HttpRequestFilter filter ) {
73+ String backendUrl = router .route (this .backendUrls );
74+ final String url = backendUrl + fullRequest .uri ();
75+ filter .filter (fullRequest , ctx );
5976 proxyService .submit (()->fetchGet (fullRequest , ctx , url ));
6077 }
6178
6279 private void fetchGet (final FullHttpRequest inbound , final ChannelHandlerContext ctx , final String url ) {
6380 final HttpGet httpGet = new HttpGet (url );
6481 //httpGet.setHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE);
6582 httpGet .setHeader (HTTP .CONN_DIRECTIVE , HTTP .CONN_KEEP_ALIVE );
83+ httpGet .setHeader ("mao" , inbound .headers ().get ("mao" ));
84+
6685 httpclient .execute (httpGet , new FutureCallback <HttpResponse >() {
6786 @ Override
6887 public void completed (final HttpResponse endpointResponse ) {
0 commit comments