Skip to content

Commit e08dfcc

Browse files
psevestremaibin
authored andcommitted
[BAEL-3562] AWS S3 with Java - Reactive Support (eugenp#8309)
* [BAEL-3164] Add spring-boot-jdbi module * [BAEL-3164] Remove extra files * [BAEL-3164] Update springboot main dependency * Reset bad commit * [BAEL-3562] Added basic code * [BAEL-3562] Some refatoring * [BAEL-3562] More refatoring * [BAEL-3562] LiveTests
1 parent b15d47b commit e08dfcc

18 files changed

Lines changed: 925 additions & 0 deletions
22.2 KB
Loading
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
participant "Client 1" as C1
2+
participant "Client 2" as C2
3+
participant "Reactive Web App" as RWS
4+
participant "Backend" as S3
5+
C1 -> RWS: POST
6+
activate C1
7+
activate RWS
8+
RWS -> S3: Async POST
9+
deactivate RWS
10+
C2 -> RWS: POST
11+
activate C2
12+
activate RWS
13+
RWS -> S3: Async POST
14+
deactivate RWS
15+
S3 --> RWS: Async Result
16+
activate RWS
17+
RWS -->C2: Result
18+
deactivate RWS
19+
deactivate C2
20+
// First file EOF
21+
S3 --> RWS: Async Result
22+
activate RWS
23+
RWS -->C1: Result
24+
deactivate RWS
25+
deactivate C1
26+
27+
28+
29+
31.1 KB
Loading
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
participant Client 1
2+
participant Client 2
3+
participant Controller
4+
participant Backend
5+
Client 1-> Controller: POST Data
6+
activate Client 1
7+
activate Controller
8+
Controller -> Backend: Save Data
9+
activate Backend
10+
note left of Controller #yellow: Controller blocked\nuntil result received
11+
Backend --> Controller: Result
12+
deactivate Backend
13+
Controller --> Client 1: Result
14+
deactivate Client 1
15+
deactivate Controller
16+
// 2nd Upload
17+
Client 2-> Controller: POST Data
18+
activate Client 2
19+
activate Controller
20+
Controller -> Backend: Save Data
21+
activate Backend
22+
note left of Controller #yellow: Controller blocket\nuntil result received
23+
Backend --> Controller: Result
24+
deactivate Backend
25+
Controller --> Client 2: Result
26+
deactivate Controller
27+
deactivate Client 2
28+

aws-reactive/pom.xml

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<parent>
8+
<groupId>com.baeldung</groupId>
9+
<artifactId>parent-modules</artifactId>
10+
<version>1.0.0-SNAPSHOT</version>
11+
</parent>
12+
13+
<artifactId>aws-reactive</artifactId>
14+
<version>0.0.1-SNAPSHOT</version>
15+
<name>aws-reactive</name>
16+
<description>AWS Reactive Sample</description>
17+
18+
<properties>
19+
<java.version>1.8</java.version>
20+
</properties>
21+
22+
<dependencyManagement>
23+
<dependencies>
24+
25+
<dependency>
26+
<!-- Import dependency management from Spring Boot -->
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-dependencies</artifactId>
29+
<version>2.2.1.RELEASE</version>
30+
<type>pom</type>
31+
<scope>import</scope>
32+
</dependency>
33+
34+
<dependency>
35+
<groupId>software.amazon.awssdk</groupId>
36+
<artifactId>bom</artifactId>
37+
<version>2.10.27</version>
38+
<type>pom</type>
39+
<scope>import</scope>
40+
</dependency>
41+
</dependencies>
42+
</dependencyManagement>
43+
44+
<dependencies>
45+
<dependency>
46+
<groupId>org.springframework.boot</groupId>
47+
<artifactId>spring-boot-starter-webflux</artifactId>
48+
</dependency>
49+
50+
<dependency>
51+
<groupId>software.amazon.awssdk</groupId>
52+
<artifactId>s3</artifactId>
53+
<scope>compile</scope>
54+
</dependency>
55+
56+
<dependency>
57+
<artifactId>netty-nio-client</artifactId>
58+
<groupId>software.amazon.awssdk</groupId>
59+
<scope>compile</scope>
60+
</dependency>
61+
62+
63+
<dependency>
64+
<groupId>org.springframework.boot</groupId>
65+
<artifactId>spring-boot-starter-test</artifactId>
66+
<scope>test</scope>
67+
<exclusions>
68+
<exclusion>
69+
<groupId>org.junit.vintage</groupId>
70+
<artifactId>junit-vintage-engine</artifactId>
71+
</exclusion>
72+
</exclusions>
73+
</dependency>
74+
75+
76+
<dependency>
77+
<groupId>io.projectreactor</groupId>
78+
<artifactId>reactor-test</artifactId>
79+
<scope>test</scope>
80+
</dependency>
81+
<dependency>
82+
<groupId>org.springframework.boot</groupId>
83+
<artifactId>spring-boot-devtools</artifactId>
84+
<scope>runtime</scope>
85+
</dependency>
86+
<dependency>
87+
<groupId>org.springframework.boot</groupId>
88+
<artifactId>spring-boot-configuration-processor</artifactId>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.projectlombok</groupId>
92+
<artifactId>lombok</artifactId>
93+
</dependency>
94+
</dependencies>
95+
96+
<build>
97+
<plugins>
98+
<plugin>
99+
<groupId>org.springframework.boot</groupId>
100+
<artifactId>spring-boot-maven-plugin</artifactId>
101+
</plugin>
102+
</plugins>
103+
</build>
104+
105+
</project>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.baeldung.aws.reactive.s3;
2+
3+
import java.util.Optional;
4+
5+
import org.springframework.http.HttpStatus;
6+
7+
import lombok.AllArgsConstructor;
8+
import software.amazon.awssdk.core.SdkResponse;
9+
import software.amazon.awssdk.http.SdkHttpResponse;
10+
11+
@AllArgsConstructor
12+
public class DownloadFailedException extends RuntimeException {
13+
14+
private static final long serialVersionUID = 1L;
15+
16+
private int statusCode;
17+
private Optional<String> statusText;
18+
19+
public DownloadFailedException(SdkResponse response) {
20+
21+
SdkHttpResponse httpResponse = response.sdkHttpResponse();
22+
if (httpResponse != null) {
23+
this.statusCode = httpResponse.statusCode();
24+
this.statusText = httpResponse.statusText();
25+
} else {
26+
this.statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
27+
this.statusText = Optional.of("UNKNOWN");
28+
}
29+
30+
}
31+
32+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/**
2+
*
3+
*/
4+
package com.baeldung.aws.reactive.s3;
5+
6+
import java.io.InputStream;
7+
import java.nio.ByteBuffer;
8+
import java.util.Map;
9+
import java.util.Map.Entry;
10+
import java.util.concurrent.CompletableFuture;
11+
12+
import org.springframework.core.io.buffer.DataBuffer;
13+
import org.springframework.core.io.buffer.DataBufferUtils;
14+
import org.springframework.http.HttpHeaders;
15+
import org.springframework.http.ResponseEntity;
16+
import org.springframework.http.ResponseEntity.BodyBuilder;
17+
import org.springframework.web.bind.annotation.GetMapping;
18+
import org.springframework.web.bind.annotation.PathVariable;
19+
import org.springframework.web.bind.annotation.RequestMapping;
20+
import org.springframework.web.bind.annotation.RestController;
21+
22+
import lombok.extern.slf4j.Slf4j;
23+
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.Mono;
25+
import software.amazon.awssdk.core.ResponseBytes;
26+
import software.amazon.awssdk.core.SdkResponse;
27+
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
28+
import software.amazon.awssdk.core.async.SdkPublisher;
29+
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
30+
import software.amazon.awssdk.http.SdkHttpResponse;
31+
import software.amazon.awssdk.services.s3.S3AsyncClient;
32+
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
33+
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
34+
35+
/**
36+
* @author Philippe
37+
*
38+
*/
39+
@RestController
40+
@RequestMapping("/inbox")
41+
@Slf4j
42+
public class DownloadResource {
43+
44+
45+
private final S3AsyncClient s3client;
46+
private final S3ClientConfigurarionProperties s3config;
47+
48+
public DownloadResource(S3AsyncClient s3client, S3ClientConfigurarionProperties s3config) {
49+
this.s3client = s3client;
50+
this.s3config = s3config;
51+
}
52+
53+
54+
@GetMapping(path="/{filekey}")
55+
public Mono<ResponseEntity<Flux<ByteBuffer>>> downloadFile(@PathVariable("filekey") String filekey) {
56+
57+
GetObjectRequest request = GetObjectRequest.builder()
58+
.bucket(s3config.getBucket())
59+
.key(filekey)
60+
.build();
61+
62+
return Mono.fromFuture(s3client.getObject(request,new FluxResponseProvider()))
63+
.map( (response) -> {
64+
checkResult(response.sdkResponse);
65+
String filename = getMetadataItem(response.sdkResponse,"filename",filekey);
66+
67+
log.info("[I65] filename={}, length={}",filename, response.sdkResponse.contentLength() );
68+
69+
return ResponseEntity.ok()
70+
.header(HttpHeaders.CONTENT_TYPE, response.sdkResponse.contentType())
71+
.header(HttpHeaders.CONTENT_LENGTH, Long.toString(response.sdkResponse.contentLength()))
72+
.header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=\"" + filename + "\"")
73+
.body(response.flux);
74+
});
75+
}
76+
77+
/**
78+
* Lookup a metadata key in a case-insensitive way.
79+
* @param sdkResponse
80+
* @param key
81+
* @param defaultValue
82+
* @return
83+
*/
84+
private String getMetadataItem(GetObjectResponse sdkResponse, String key, String defaultValue) {
85+
for( Entry<String, String> entry : sdkResponse.metadata().entrySet()) {
86+
if ( entry.getKey().equalsIgnoreCase(key)) {
87+
return entry.getValue();
88+
}
89+
}
90+
return defaultValue;
91+
}
92+
93+
94+
// Helper used to check return codes from an API call
95+
private static void checkResult(GetObjectResponse response) {
96+
SdkHttpResponse sdkResponse = response.sdkHttpResponse();
97+
if ( sdkResponse != null && sdkResponse.isSuccessful()) {
98+
return;
99+
}
100+
101+
throw new DownloadFailedException(response);
102+
}
103+
104+
105+
static class FluxResponseProvider implements AsyncResponseTransformer<GetObjectResponse,FluxResponse> {
106+
107+
private FluxResponse response;
108+
109+
@Override
110+
public CompletableFuture<FluxResponse> prepare() {
111+
response = new FluxResponse();
112+
return response.cf;
113+
}
114+
115+
@Override
116+
public void onResponse(GetObjectResponse sdkResponse) {
117+
this.response.sdkResponse = sdkResponse;
118+
}
119+
120+
@Override
121+
public void onStream(SdkPublisher<ByteBuffer> publisher) {
122+
response.flux = Flux.from(publisher);
123+
response.cf.complete(response);
124+
}
125+
126+
@Override
127+
public void exceptionOccurred(Throwable error) {
128+
response.cf.completeExceptionally(error);
129+
}
130+
131+
}
132+
133+
/**
134+
* Holds the API response and stream
135+
* @author Philippe
136+
*/
137+
static class FluxResponse {
138+
139+
final CompletableFuture<FluxResponse> cf = new CompletableFuture<>();
140+
GetObjectResponse sdkResponse;
141+
Flux<ByteBuffer> flux;
142+
}
143+
144+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.baeldung.aws.reactive.s3;
2+
3+
import org.springframework.boot.SpringApplication;
4+
import org.springframework.boot.autoconfigure.SpringBootApplication;
5+
6+
@SpringBootApplication
7+
public class ReactiveS3Application {
8+
9+
public static void main(String[] args) {
10+
SpringApplication.run(ReactiveS3Application.class, args);
11+
}
12+
13+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package com.baeldung.aws.reactive.s3;
2+
3+
import java.net.URI;
4+
5+
import org.springframework.boot.context.properties.ConfigurationProperties;
6+
7+
import lombok.Data;
8+
import software.amazon.awssdk.regions.Region;
9+
10+
@ConfigurationProperties(prefix = "aws.s3")
11+
@Data
12+
public class S3ClientConfigurarionProperties {
13+
14+
private Region region = Region.US_EAST_1;
15+
private URI endpoint = null;
16+
17+
private String accessKeyId;
18+
private String secretAccessKey;
19+
20+
// Bucket name we'll be using as our backend storage
21+
private String bucket;
22+
23+
// AWS S3 requires that file parts must have at least 5MB, except
24+
// for the last part. This may change for other S3-compatible services, so let't
25+
// define a configuration property for that
26+
private int multipartMinPartSize = 5*1024*1024;
27+
28+
}

0 commit comments

Comments
 (0)