2 * ========================LICENSE_START=================================
4 * ======================================================================
5 * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6 * ======================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
23 import java.lang.invoke.MethodHandles;
25 import java.util.concurrent.CompletableFuture;
27 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
31 import reactor.core.publisher.Flux;
32 import reactor.core.publisher.Mono;
33 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
34 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
35 import software.amazon.awssdk.core.BytesWrapper;
36 import software.amazon.awssdk.core.ResponseBytes;
37 import software.amazon.awssdk.core.async.AsyncRequestBody;
38 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
39 import software.amazon.awssdk.regions.Region;
40 import software.amazon.awssdk.services.s3.S3AsyncClient;
41 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
42 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
43 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
44 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
45 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
46 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
47 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
48 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
49 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
50 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
51 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
52 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
53 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
54 import software.amazon.awssdk.services.s3.model.S3Object;
56 class S3ObjectStore implements DataStore {
57 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
58 private final ApplicationConfig applicationConfig;
60 private static S3AsyncClient s3AsynchClient;
61 private final String location;
63 public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
64 this.applicationConfig = applicationConfig;
65 this.location = location;
67 getS3AsynchClient(applicationConfig);
70 private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
71 if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
72 s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
74 return s3AsynchClient;
77 private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
78 URI uri = URI.create(applicationConfig.getS3EndpointOverride());
79 return S3AsyncClient.builder() //
80 .region(Region.US_EAST_1) //
81 .endpointOverride(uri) //
82 .credentialsProvider(StaticCredentialsProvider.create( //
83 AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
84 applicationConfig.getS3SecretAccessKey())));
88 public Flux<String> listObjects(String prefix) {
89 return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
90 .map(this::externalName);
94 public Mono<Boolean> deleteObject(String name) {
95 DeleteObjectRequest request = DeleteObjectRequest.builder() //
100 CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
102 return Mono.fromFuture(future).map(resp -> true);
106 public Mono<byte[]> readObject(String name) {
107 return getDataFromS3Object(bucket(), name);
111 public Mono<byte[]> writeObject(String name, byte[] fileData) {
113 PutObjectRequest request = PutObjectRequest.builder() //
118 AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
120 CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
122 return Mono.fromFuture(future) //
123 .map(putObjectResponse -> fileData) //
124 .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage()));
128 public Mono<String> createDataStore() {
129 return createS3Bucket(bucket());
132 private Mono<String> createS3Bucket(String s3Bucket) {
134 CreateBucketRequest request = CreateBucketRequest.builder() //
138 CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
140 return Mono.fromFuture(future) //
141 .map(f -> s3Bucket) //
142 .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
143 .onErrorResume(t -> Mono.just(s3Bucket));
147 public Mono<String> deleteAllObjects() {
148 return listObjects("") //
149 .flatMap(this::deleteObject) //
151 .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK"));
154 public Mono<String> deleteBucket() {
155 DeleteBucketRequest request = DeleteBucketRequest.builder() //
159 CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
161 return Mono.fromFuture(future) //
162 .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
163 .map(resp -> bucket()) //
164 .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK"));
167 private String bucket() {
168 return applicationConfig.getS3Bucket();
171 private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
173 return listObjectsRequest(bucket, prefix, null) //
174 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
175 .map(ListObjectsResponse::contents) //
176 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
177 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
178 .flatMap(Flux::fromIterable) //
179 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
182 private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
183 ListObjectsResponse prevResponse) {
184 ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
189 if (prevResponse != null) {
190 if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
191 builder.marker(prevResponse.nextMarker());
197 ListObjectsRequest listObjectsRequest = builder.build();
198 CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
199 return Mono.fromFuture(future);
202 private Mono<byte[]> getDataFromS3Object(String bucket, String name) {
204 GetObjectRequest request = GetObjectRequest.builder() //
209 CompletableFuture<ResponseBytes<GetObjectResponse>> future =
210 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
212 return Mono.fromFuture(future) //
213 .map(BytesWrapper::asByteArray) //
214 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket,
216 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) //
217 .onErrorResume(t -> Mono.empty());
220 private String key(String name) {
221 return location + "/" + name;
224 private String externalName(String internalName) {
225 return internalName.substring(key("").length());