4c7c3c3ee23aaf429858ace047c2900bab55ec84
[ccsdk/oran.git] /
1 /*-
2  * ========================LICENSE_START=================================
3  * ONAP : ccsdk oran
4  * ======================================================================
5  * Copyright (C) 2022 Nordix Foundation. All rights reserved.
6  * ======================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.onap.ccsdk.oran.a1policymanagementservice.datastore;
22
23 import java.net.URI;
24 import java.util.concurrent.CompletableFuture;
25
26 import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import reactor.core.publisher.Flux;
31 import reactor.core.publisher.Mono;
32 import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
33 import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
34 import software.amazon.awssdk.core.BytesWrapper;
35 import software.amazon.awssdk.core.ResponseBytes;
36 import software.amazon.awssdk.core.async.AsyncRequestBody;
37 import software.amazon.awssdk.core.async.AsyncResponseTransformer;
38 import software.amazon.awssdk.regions.Region;
39 import software.amazon.awssdk.services.s3.S3AsyncClient;
40 import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
41 import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
42 import software.amazon.awssdk.services.s3.model.CreateBucketResponse;
43 import software.amazon.awssdk.services.s3.model.DeleteBucketRequest;
44 import software.amazon.awssdk.services.s3.model.DeleteBucketResponse;
45 import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
46 import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
47 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
48 import software.amazon.awssdk.services.s3.model.GetObjectResponse;
49 import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
50 import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
51 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
52 import software.amazon.awssdk.services.s3.model.PutObjectResponse;
53 import software.amazon.awssdk.services.s3.model.S3Object;
54
55 class S3ObjectStore implements DataStore {
56     private static final Logger logger = LoggerFactory.getLogger(S3ObjectStore.class);
57     private final ApplicationConfig applicationConfig;
58
59     private static S3AsyncClient s3AsynchClient;
60     private final String location;
61
62     public S3ObjectStore(ApplicationConfig applicationConfig, String location) {
63         this.applicationConfig = applicationConfig;
64         this.location = location;
65
66         getS3AsynchClient(applicationConfig);
67     }
68
69     private static synchronized S3AsyncClient getS3AsynchClient(ApplicationConfig applicationConfig) {
70         if (applicationConfig.isS3Enabled() && s3AsynchClient == null) {
71             s3AsynchClient = getS3AsyncClientBuilder(applicationConfig).build();
72         }
73         return s3AsynchClient;
74     }
75
76     private static S3AsyncClientBuilder getS3AsyncClientBuilder(ApplicationConfig applicationConfig) {
77         URI uri = URI.create(applicationConfig.getS3EndpointOverride());
78         return S3AsyncClient.builder() //
79                 .region(Region.US_EAST_1) //
80                 .endpointOverride(uri) //
81                 .credentialsProvider(StaticCredentialsProvider.create( //
82                         AwsBasicCredentials.create(applicationConfig.getS3AccessKeyId(), //
83                                 applicationConfig.getS3SecretAccessKey())));
84     }
85
86     @Override
87     public Flux<String> listObjects(String prefix) {
88         return listObjectsInBucket(bucket(), location + "/" + prefix).map(S3Object::key) //
89                 .map(this::externalName);
90     }
91
92     @Override
93     public Mono<Boolean> deleteObject(String name) {
94         DeleteObjectRequest request = DeleteObjectRequest.builder() //
95                 .bucket(bucket()) //
96                 .key(key(name)) //
97                 .build();
98
99         CompletableFuture<DeleteObjectResponse> future = s3AsynchClient.deleteObject(request);
100
101         return Mono.fromFuture(future).map(resp -> true);
102     }
103
104     @Override
105     public Mono<byte[]> readObject(String name) {
106         return getDataFromS3Object(bucket(), name);
107     }
108
109     @Override
110     public Mono<byte[]> writeObject(String name, byte[] fileData) {
111
112         PutObjectRequest request = PutObjectRequest.builder() //
113                 .bucket(bucket()) //
114                 .key(key(name)) //
115                 .build();
116
117         AsyncRequestBody body = AsyncRequestBody.fromBytes(fileData);
118
119         CompletableFuture<PutObjectResponse> future = s3AsynchClient.putObject(request, body);
120
121         return Mono.fromFuture(future) //
122                 .map(putObjectResponse -> fileData) //
123                 .doOnError(t -> logger.error("Failed to store object '{}' in S3 {}", key(name), t.getMessage()));
124     }
125
126     @Override
127     public Mono<String> createDataStore() {
128         return createS3Bucket(bucket());
129     }
130
131     private Mono<String> createS3Bucket(String s3Bucket) {
132
133         CreateBucketRequest request = CreateBucketRequest.builder() //
134                 .bucket(s3Bucket) //
135                 .build();
136
137         CompletableFuture<CreateBucketResponse> future = s3AsynchClient.createBucket(request);
138
139         return Mono.fromFuture(future) //
140                 .map(f -> s3Bucket) //
141                 .doOnError(t -> logger.debug("Could not create S3 bucket: {}", t.getMessage()))
142                 .onErrorResume(t -> Mono.just(s3Bucket));
143     }
144
145     @Override
146     public Mono<String> deleteAllObjects() {
147         return listObjects("") //
148                 .flatMap(this::deleteObject) //
149                 .collectList() //
150                 .map(resp -> "OK").onErrorResume(t -> Mono.just("NOK"));
151     }
152
153     public Mono<String> deleteBucket() {
154         DeleteBucketRequest request = DeleteBucketRequest.builder() //
155                 .bucket(bucket()) //
156                 .build();
157
158         CompletableFuture<DeleteBucketResponse> future = s3AsynchClient.deleteBucket(request);
159
160         return Mono.fromFuture(future) //
161                 .doOnError(t -> logger.warn("Could not delete bucket: {}, reason: {}", bucket(), t.getMessage()))
162                 .map(resp -> bucket()) //
163                 .doOnNext(resp -> logger.debug("Deleted bucket: {}", bucket())).onErrorResume(t -> Mono.just("NOK"));
164     }
165
166     private String bucket() {
167         return applicationConfig.getS3Bucket();
168     }
169
170     private Flux<S3Object> listObjectsInBucket(String bucket, String prefix) {
171
172         return listObjectsRequest(bucket, prefix, null) //
173                 .expand(response -> listObjectsRequest(bucket, prefix, response)) //
174                 .map(ListObjectsResponse::contents) //
175                 .doOnNext(f -> logger.debug("Found objects in {}: {}", bucket, f.size())) //
176                 .doOnError(t -> logger.warn("Error fromlist objects: {}", t.getMessage())) //
177                 .flatMap(Flux::fromIterable) //
178                 .doOnNext(obj -> logger.debug("Found object: {}", obj.key()));
179     }
180
181     private Mono<ListObjectsResponse> listObjectsRequest(String bucket, String prefix,
182             ListObjectsResponse prevResponse) {
183         ListObjectsRequest.Builder builder = ListObjectsRequest.builder() //
184                 .bucket(bucket) //
185                 .maxKeys(1000) //
186                 .prefix(prefix);
187
188         if (prevResponse != null) {
189             if (Boolean.TRUE.equals(prevResponse.isTruncated())) {
190                 builder.marker(prevResponse.nextMarker());
191             } else {
192                 return Mono.empty();
193             }
194         }
195
196         ListObjectsRequest listObjectsRequest = builder.build();
197         CompletableFuture<ListObjectsResponse> future = s3AsynchClient.listObjects(listObjectsRequest);
198         return Mono.fromFuture(future);
199     }
200
201     private Mono<byte[]> getDataFromS3Object(String bucket, String name) {
202
203         GetObjectRequest request = GetObjectRequest.builder() //
204                 .bucket(bucket) //
205                 .key(key(name)) //
206                 .build();
207
208         CompletableFuture<ResponseBytes<GetObjectResponse>> future =
209                 s3AsynchClient.getObject(request, AsyncResponseTransformer.toBytes());
210
211         return Mono.fromFuture(future) //
212                 .map(BytesWrapper::asByteArray) //
213                 .doOnError(t -> logger.error("Failed to get file from S3, key:{}, bucket: {}, {}", key(name), bucket,
214                         t.getMessage())) //
215                 .doOnEach(n -> logger.debug("Read file from S3: {} {}", bucket, key(name))) //
216                 .onErrorResume(t -> Mono.empty());
217     }
218
219     private String key(String name) {
220         return location + "/" + name;
221     }
222
223     private String externalName(String internalName) {
224         return internalName.substring(key("").length());
225     }
226
227 }